...

Source file src/github.com/apache/arrow/go/v15/parquet/schema/reflection.go

Documentation: github.com/apache/arrow/go/v15/parquet/schema

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package schema
    18  
    19  import (
    20  	"fmt"
    21  	"reflect"
    22  	"strconv"
    23  	"strings"
    24  
    25  	"github.com/apache/arrow/go/v15/arrow/float16"
    26  	"github.com/apache/arrow/go/v15/parquet"
    27  	format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
    28  	"golang.org/x/xerrors"
    29  )
    30  
    31  type taggedInfo struct {
    32  	Name string
    33  
    34  	Type      parquet.Type
    35  	KeyType   parquet.Type
    36  	ValueType parquet.Type
    37  
    38  	Length      int32
    39  	KeyLength   int32
    40  	ValueLength int32
    41  
    42  	Scale      int32
    43  	KeyScale   int32
    44  	ValueScale int32
    45  
    46  	Precision      int32
    47  	KeyPrecision   int32
    48  	ValuePrecision int32
    49  
    50  	FieldID      int32
    51  	KeyFieldID   int32
    52  	ValueFieldID int32
    53  
    54  	RepetitionType  parquet.Repetition
    55  	ValueRepetition parquet.Repetition
    56  
    57  	Converted      ConvertedType
    58  	KeyConverted   ConvertedType
    59  	ValueConverted ConvertedType
    60  
    61  	LogicalFields      map[string]string
    62  	KeyLogicalFields   map[string]string
    63  	ValueLogicalFields map[string]string
    64  
    65  	LogicalType      LogicalType
    66  	KeyLogicalType   LogicalType
    67  	ValueLogicalType LogicalType
    68  
    69  	Exclude bool
    70  }
    71  
    72  func (t *taggedInfo) CopyForKey() (ret taggedInfo) {
    73  	ret = *t
    74  	ret.Type = t.KeyType
    75  	ret.Length = t.KeyLength
    76  	ret.Scale = t.KeyScale
    77  	ret.Precision = t.KeyPrecision
    78  	ret.FieldID = t.KeyFieldID
    79  	ret.RepetitionType = parquet.Repetitions.Required
    80  	ret.Converted = t.KeyConverted
    81  	ret.LogicalType = t.KeyLogicalType
    82  	return
    83  }
    84  
    85  func (t *taggedInfo) CopyForValue() (ret taggedInfo) {
    86  	ret = *t
    87  	ret.Type = t.ValueType
    88  	ret.Length = t.ValueLength
    89  	ret.Scale = t.ValueScale
    90  	ret.Precision = t.ValuePrecision
    91  	ret.FieldID = t.ValueFieldID
    92  	ret.RepetitionType = t.ValueRepetition
    93  	ret.Converted = t.ValueConverted
    94  	ret.LogicalType = t.ValueLogicalType
    95  	return
    96  }
    97  
    98  func (t *taggedInfo) UpdateLogicalTypes() {
    99  	processLogicalType := func(fields map[string]string, precision, scale int32) LogicalType {
   100  		t, ok := fields["type"]
   101  		if !ok {
   102  			return NoLogicalType{}
   103  		}
   104  
   105  		switch strings.ToLower(t) {
   106  		case "string":
   107  			return StringLogicalType{}
   108  		case "map":
   109  			return MapLogicalType{}
   110  		case "list":
   111  			return ListLogicalType{}
   112  		case "enum":
   113  			return EnumLogicalType{}
   114  		case "decimal":
   115  			if v, ok := fields["precision"]; ok {
   116  				precision = int32FromType(v)
   117  			}
   118  			if v, ok := fields["scale"]; ok {
   119  				scale = int32FromType(v)
   120  			}
   121  			return NewDecimalLogicalType(precision, scale)
   122  		case "date":
   123  			return DateLogicalType{}
   124  		case "time":
   125  			unit, ok := fields["unit"]
   126  			if !ok {
   127  				panic("must specify unit for time logical type")
   128  			}
   129  			adjustedToUtc, ok := fields["isadjustedutc"]
   130  			if !ok {
   131  				adjustedToUtc = "true"
   132  			}
   133  			return NewTimeLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(strings.ToLower(unit)))
   134  		case "timestamp":
   135  			unit, ok := fields["unit"]
   136  			if !ok {
   137  				panic("must specify unit for time logical type")
   138  			}
   139  			adjustedToUtc, ok := fields["isadjustedutc"]
   140  			if !ok {
   141  				adjustedToUtc = "true"
   142  			}
   143  			return NewTimestampLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(unit))
   144  		case "integer":
   145  			width, ok := fields["bitwidth"]
   146  			if !ok {
   147  				panic("must specify bitwidth if explicitly setting integer logical type")
   148  			}
   149  			signed, ok := fields["signed"]
   150  			if !ok {
   151  				signed = "true"
   152  			}
   153  
   154  			return NewIntLogicalType(int8(int32FromType(width)), boolFromStr(signed))
   155  		case "null":
   156  			return NullLogicalType{}
   157  		case "json":
   158  			return JSONLogicalType{}
   159  		case "bson":
   160  			return BSONLogicalType{}
   161  		case "uuid":
   162  			return UUIDLogicalType{}
   163  		case "float16":
   164  			return Float16LogicalType{}
   165  		default:
   166  			panic(fmt.Errorf("invalid logical type specified: %s", t))
   167  		}
   168  	}
   169  
   170  	t.LogicalType = processLogicalType(t.LogicalFields, t.Precision, t.Scale)
   171  	t.KeyLogicalType = processLogicalType(t.KeyLogicalFields, t.KeyPrecision, t.KeyScale)
   172  	t.ValueLogicalType = processLogicalType(t.ValueLogicalFields, t.ValuePrecision, t.ValueScale)
   173  }
   174  
   175  func newTaggedInfo() taggedInfo {
   176  	return taggedInfo{
   177  		Type:               parquet.Types.Undefined,
   178  		KeyType:            parquet.Types.Undefined,
   179  		ValueType:          parquet.Types.Undefined,
   180  		RepetitionType:     parquet.Repetitions.Undefined,
   181  		ValueRepetition:    parquet.Repetitions.Undefined,
   182  		Converted:          ConvertedTypes.NA,
   183  		KeyConverted:       ConvertedTypes.NA,
   184  		ValueConverted:     ConvertedTypes.NA,
   185  		FieldID:            -1,
   186  		KeyFieldID:         -1,
   187  		ValueFieldID:       -1,
   188  		LogicalFields:      make(map[string]string),
   189  		KeyLogicalFields:   make(map[string]string),
   190  		ValueLogicalFields: make(map[string]string),
   191  		LogicalType:        NoLogicalType{},
   192  		KeyLogicalType:     NoLogicalType{},
   193  		ValueLogicalType:   NoLogicalType{},
   194  		Exclude:            false,
   195  	}
   196  }
   197  
   198  var int32FromType = func(v string) int32 {
   199  	val, err := strconv.Atoi(v)
   200  	if err != nil {
   201  		panic(err)
   202  	}
   203  	return int32(val)
   204  }
   205  
   206  var boolFromStr = func(v string) bool {
   207  	val, err := strconv.ParseBool(v)
   208  	if err != nil {
   209  		panic(err)
   210  	}
   211  	return val
   212  }
   213  
   214  func infoFromTags(f reflect.StructTag) *taggedInfo {
   215  	typeFromStr := func(v string) parquet.Type {
   216  		t, err := format.TypeFromString(strings.ToUpper(v))
   217  		if err != nil {
   218  			panic(fmt.Errorf("invalid type specified: %s", v))
   219  		}
   220  		return parquet.Type(t)
   221  	}
   222  
   223  	repFromStr := func(v string) parquet.Repetition {
   224  		r, err := format.FieldRepetitionTypeFromString(strings.ToUpper(v))
   225  		if err != nil {
   226  			panic(err)
   227  		}
   228  		return parquet.Repetition(r)
   229  	}
   230  
   231  	convertedFromStr := func(v string) ConvertedType {
   232  		c, err := format.ConvertedTypeFromString(strings.ToUpper(v))
   233  		if err != nil {
   234  			panic(err)
   235  		}
   236  		return ConvertedType(c)
   237  	}
   238  
   239  	if ptags, ok := f.Lookup("parquet"); ok {
   240  		info := newTaggedInfo()
   241  		if ptags == "-" {
   242  			info.Exclude = true
   243  			return &info
   244  		}
   245  		for _, tag := range strings.Split(strings.Replace(ptags, "\t", "", -1), ",") {
   246  			tag = strings.TrimSpace(tag)
   247  			kv := strings.SplitN(tag, "=", 2)
   248  			key := strings.TrimSpace(strings.ToLower(kv[0]))
   249  			value := strings.TrimSpace(kv[1])
   250  
   251  			switch key {
   252  			case "name":
   253  				info.Name = value
   254  			case "type":
   255  				info.Type = typeFromStr(value)
   256  			case "keytype":
   257  				info.KeyType = typeFromStr(value)
   258  			case "valuetype":
   259  				info.ValueType = typeFromStr(value)
   260  			case "length":
   261  				info.Length = int32FromType(value)
   262  			case "keylength":
   263  				info.KeyLength = int32FromType(value)
   264  			case "valuelength":
   265  				info.ValueLength = int32FromType(value)
   266  			case "scale":
   267  				info.Scale = int32FromType(value)
   268  			case "keyscale":
   269  				info.KeyScale = int32FromType(value)
   270  			case "valuescale":
   271  				info.ValueScale = int32FromType(value)
   272  			case "precision":
   273  				info.Precision = int32FromType(value)
   274  			case "keyprecision":
   275  				info.KeyPrecision = int32FromType(value)
   276  			case "valueprecision":
   277  				info.ValuePrecision = int32FromType(value)
   278  			case "fieldid":
   279  				info.FieldID = int32FromType(value)
   280  			case "keyfieldid":
   281  				info.KeyFieldID = int32FromType(value)
   282  			case "valuefieldid":
   283  				info.ValueFieldID = int32FromType(value)
   284  			case "repetition":
   285  				info.RepetitionType = repFromStr(value)
   286  			case "valuerepetition":
   287  				info.ValueRepetition = repFromStr(value)
   288  			case "converted":
   289  				info.Converted = convertedFromStr(value)
   290  			case "keyconverted":
   291  				info.KeyConverted = convertedFromStr(value)
   292  			case "valueconverted":
   293  				info.ValueConverted = convertedFromStr(value)
   294  			case "logical":
   295  				info.LogicalFields["type"] = value
   296  			case "keylogical":
   297  				info.KeyLogicalFields["type"] = value
   298  			case "valuelogical":
   299  				info.ValueLogicalFields["type"] = value
   300  			default:
   301  				switch {
   302  				case strings.HasPrefix(key, "logical."):
   303  					info.LogicalFields[strings.TrimPrefix(key, "logical.")] = value
   304  				case strings.HasPrefix(key, "keylogical."):
   305  					info.KeyLogicalFields[strings.TrimPrefix(key, "keylogical.")] = value
   306  				case strings.HasPrefix(key, "valuelogical."):
   307  					info.ValueLogicalFields[strings.TrimPrefix(key, "valuelogical.")] = value
   308  				}
   309  			}
   310  		}
   311  		info.UpdateLogicalTypes()
   312  		return &info
   313  	}
   314  	return nil
   315  }
   316  
   317  // typeToNode recursively converts a physical type and the tag info into parquet Nodes
   318  //
   319  // to avoid having to propagate errors up potentially high numbers of recursive calls
   320  // we use panics and then recover in the public function NewSchemaFromStruct so that a
   321  // failure very far down the stack quickly unwinds.
   322  func typeToNode(name string, typ reflect.Type, repType parquet.Repetition, info *taggedInfo) Node {
   323  	// set up our default values for everything
   324  	var (
   325  		converted             = ConvertedTypes.None
   326  		logical   LogicalType = NoLogicalType{}
   327  		fieldID               = int32(-1)
   328  		physical              = parquet.Types.Undefined
   329  		typeLen               = 0
   330  		precision             = 0
   331  		scale                 = 0
   332  	)
   333  	if info != nil { // we have struct tag info to process
   334  		fieldID = info.FieldID
   335  		if info.Converted != ConvertedTypes.NA {
   336  			converted = info.Converted
   337  		}
   338  		logical = info.LogicalType
   339  		physical = info.Type
   340  		typeLen = int(info.Length)
   341  		precision = int(info.Precision)
   342  		scale = int(info.Scale)
   343  
   344  		if info.Name != "" {
   345  			name = info.Name
   346  		}
   347  		if info.RepetitionType != parquet.Repetitions.Undefined {
   348  			repType = info.RepetitionType
   349  		}
   350  	}
   351  
   352  	// simplify the logic by switching based on the reflection Kind
   353  	switch typ.Kind() {
   354  	case reflect.Map:
   355  		// a map must have a logical type of MAP or have no tag for logical type in which case
   356  		// we assume MAP logical type.
   357  		if !logical.IsNone() && !logical.Equals(MapLogicalType{}) {
   358  			panic("cannot set logical type to something other than map for a map")
   359  		}
   360  
   361  		infoCopy := newTaggedInfo()
   362  		if info != nil { // populate any value specific tags to propagate for the value type
   363  			infoCopy = info.CopyForValue()
   364  		}
   365  
   366  		// create the node for the value type of the map
   367  		value := typeToNode("value", typ.Elem(), parquet.Repetitions.Required, &infoCopy)
   368  		if info != nil { // change our copy to now use the key specific tags if they exist
   369  			infoCopy = info.CopyForKey()
   370  		}
   371  
   372  		// create the node for the key type of the map
   373  		key := typeToNode("key", typ.Key(), parquet.Repetitions.Required, &infoCopy)
   374  		if key.RepetitionType() != parquet.Repetitions.Required { // key cannot be optional
   375  			panic("key type of map must be Required")
   376  		}
   377  		return Must(MapOf(name, key, value, repType, fieldID))
   378  	case reflect.Struct:
   379  		if typ == reflect.TypeOf(float16.Num{}) {
   380  			return MustPrimitive(NewPrimitiveNodeLogical(name, repType, Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, fieldID))
   381  		}
   382  		// structs are Group nodes
   383  		fields := make(FieldList, 0)
   384  		for i := 0; i < typ.NumField(); i++ {
   385  			f := typ.Field(i)
   386  			tags := infoFromTags(f.Tag)
   387  			if tags == nil || !tags.Exclude {
   388  				fields = append(fields, typeToNode(f.Name, f.Type, parquet.Repetitions.Required, tags))
   389  			}
   390  		}
   391  		// group nodes don't have a physical type
   392  		if physical != parquet.Types.Undefined {
   393  			panic("cannot specify custom type on struct")
   394  		}
   395  		// group nodes don't have converted or logical types
   396  		if converted != ConvertedTypes.None {
   397  			panic("cannot specify converted types for a struct")
   398  		}
   399  		if !logical.IsNone() {
   400  			panic("cannot specify logicaltype for a struct")
   401  		}
   402  		return Must(NewGroupNode(name, repType, fields, fieldID))
   403  	case reflect.Ptr: // if we encounter a pointer create a node for the type it points to, but mark it as optional
   404  		return typeToNode(name, typ.Elem(), parquet.Repetitions.Optional, info)
   405  	case reflect.Array:
   406  		// arrays are repeated or fixed size
   407  		if typ == reflect.TypeOf(parquet.Int96{}) {
   408  			return NewInt96Node(name, repType, fieldID)
   409  		}
   410  
   411  		if typ.Elem() == reflect.TypeOf(byte(0)) { // something like [12]byte translates to FixedLenByteArray with length 12
   412  			if physical == parquet.Types.Undefined {
   413  				physical = parquet.Types.FixedLenByteArray
   414  			}
   415  			if typeLen == 0 { // if there was no type length specified in the tag, use the length of the type.
   416  				typeLen = typ.Len()
   417  			}
   418  			if !logical.IsNone() {
   419  				return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, physical, typeLen, fieldID))
   420  			}
   421  			return MustPrimitive(NewPrimitiveNodeConverted(name, repType, physical, converted, typeLen, precision, scale, fieldID))
   422  		}
   423  		fallthrough // if it's not a fixed len byte array type, then just treat it like a slice
   424  	case reflect.Slice:
   425  		// for slices, we default to treating them as lists unless the repetition type is set to REPEATED or they are
   426  		// a bytearray/fixedlenbytearray
   427  		switch {
   428  		case repType == parquet.Repetitions.Repeated:
   429  			return typeToNode(name, typ.Elem(), parquet.Repetitions.Repeated, info)
   430  		case physical == parquet.Types.FixedLenByteArray || physical == parquet.Types.ByteArray:
   431  			if typ.Elem() != reflect.TypeOf(byte(0)) {
   432  				panic("slice with physical type ByteArray or FixedLenByteArray must be []byte")
   433  			}
   434  			fallthrough
   435  		case typ.Elem() == reflect.TypeOf(byte(0)):
   436  			if physical == parquet.Types.Undefined {
   437  				physical = parquet.Types.ByteArray
   438  			}
   439  			if !logical.IsNone() {
   440  				return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, physical, typeLen, fieldID))
   441  			}
   442  			return MustPrimitive(NewPrimitiveNodeConverted(name, repType, physical, converted, typeLen, precision, scale, fieldID))
   443  		default:
   444  			var elemInfo *taggedInfo
   445  			if info != nil {
   446  				elemInfo = &taggedInfo{}
   447  				*elemInfo = info.CopyForValue()
   448  			}
   449  
   450  			if !logical.IsNone() && !logical.Equals(ListLogicalType{}) {
   451  				panic("slice must either be repeated or a List type")
   452  			}
   453  			if converted != ConvertedTypes.None && converted != ConvertedTypes.List {
   454  				panic("slice must either be repeated or a List type")
   455  			}
   456  			return Must(ListOf(typeToNode(name, typ.Elem(), parquet.Repetitions.Required, elemInfo), repType, fieldID))
   457  		}
   458  	case reflect.String:
   459  		// strings are byte arrays or fixedlen byte array
   460  		t := parquet.Types.ByteArray
   461  		switch physical {
   462  		case parquet.Types.Undefined, parquet.Types.ByteArray:
   463  		case parquet.Types.FixedLenByteArray:
   464  			t = parquet.Types.FixedLenByteArray
   465  		default:
   466  			panic("string fields should be of type bytearray or fixedlenbytearray only")
   467  		}
   468  
   469  		if !logical.IsNone() {
   470  			return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, t, typeLen, fieldID))
   471  		}
   472  
   473  		return MustPrimitive(NewPrimitiveNodeConverted(name, repType, t, converted, typeLen, precision, scale, fieldID))
   474  	case reflect.Int, reflect.Int32, reflect.Int8, reflect.Int16, reflect.Int64:
   475  		// handle integer types, default to setting the corresponding logical type
   476  		ptyp := parquet.Types.Int32
   477  		if typ.Bits() == 64 {
   478  			ptyp = parquet.Types.Int64
   479  		}
   480  
   481  		if physical != parquet.Types.Undefined {
   482  			ptyp = physical
   483  		}
   484  
   485  		if !logical.IsNone() {
   486  			return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, ptyp, typeLen, fieldID))
   487  		}
   488  
   489  		bitwidth := int8(typ.Bits())
   490  		if physical != parquet.Types.Undefined {
   491  			if ptyp == parquet.Types.Int32 {
   492  				bitwidth = 32
   493  			} else if ptyp == parquet.Types.Int64 {
   494  				bitwidth = 64
   495  			}
   496  		}
   497  
   498  		if converted != ConvertedTypes.None {
   499  			return MustPrimitive(NewPrimitiveNodeConverted(name, repType, ptyp, converted, 0, precision, scale, fieldID))
   500  		}
   501  
   502  		return MustPrimitive(NewPrimitiveNodeLogical(name, repType, NewIntLogicalType(bitwidth, true), ptyp, 0, fieldID))
   503  	case reflect.Uint, reflect.Uint32, reflect.Uint8, reflect.Uint16, reflect.Uint64:
   504  		// handle unsigned integer types and default to the corresponding logical type for it.
   505  		ptyp := parquet.Types.Int32
   506  		if typ.Bits() == 64 {
   507  			ptyp = parquet.Types.Int64
   508  		}
   509  
   510  		if physical != parquet.Types.Undefined {
   511  			ptyp = physical
   512  		}
   513  
   514  		if !logical.IsNone() {
   515  			return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, ptyp, typeLen, fieldID))
   516  		}
   517  
   518  		bitwidth := int8(typ.Bits())
   519  		if physical != parquet.Types.Undefined {
   520  			if ptyp == parquet.Types.Int32 {
   521  				bitwidth = 32
   522  			} else if ptyp == parquet.Types.Int64 {
   523  				bitwidth = 64
   524  			}
   525  		}
   526  
   527  		if converted != ConvertedTypes.None {
   528  			return MustPrimitive(NewPrimitiveNodeConverted(name, repType, ptyp, converted, 0, precision, scale, fieldID))
   529  		}
   530  
   531  		return MustPrimitive(NewPrimitiveNodeLogical(name, repType, NewIntLogicalType(bitwidth, false), ptyp, 0, fieldID))
   532  	case reflect.Bool:
   533  		if !logical.IsNone() {
   534  			return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Boolean, typeLen, fieldID))
   535  		}
   536  		return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Boolean, converted, typeLen, precision, scale, fieldID))
   537  	case reflect.Float32:
   538  		if !logical.IsNone() {
   539  			return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Float, typeLen, fieldID))
   540  		}
   541  		return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Float, converted, typeLen, precision, scale, fieldID))
   542  	case reflect.Float64:
   543  		if !logical.IsNone() {
   544  			return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Double, typeLen, fieldID))
   545  		}
   546  		return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Double, converted, typeLen, precision, scale, fieldID))
   547  	}
   548  	return nil
   549  }
   550  
   551  // NewSchemaFromStruct generates a schema from an object type via reflection of
   552  // the type and reading struct tags for "parquet".
   553  //
   554  // Rules
   555  //
   556  // Everything defaults to Required repetition, unless otherwise specified.
   557  // Pointer types become Optional repetition.
   558  // Arrays and Slices become logical List types unless using the tag `repetition=repeated`.
   559  //
   560  // A length specified byte field (like [5]byte) becomes a fixed_len_byte_array of that length
   561  // unless otherwise specified by tags.
   562  //
   563  // string and []byte both become ByteArray unless otherwise specified.
   564  //
   565  // Integer types will default to having a logical type of the appropriate bit width
   566  // and signedness rather than having no logical type, ie: an int8 will become an int32
   567  // node with logical type Int(bitWidth=8, signed=true).
   568  //
   569  // Structs will become group nodes with the fields of the struct as the fields of the group,
   570  // recursively creating the nodes.
   571  //
   572  // maps will become appropriate Map structures in the schema of the defined key and values.
   573  //
   574  // Available Tags
   575  //
   576  // name: by default the node will have the same name as the field, this tag let's you specify a name
   577  //
   578  // type: Specify the physical type instead of using the field type
   579  //
   580  // length: specify the type length of the node, only relevant for fixed_len_byte_array
   581  //
   582  // scale: specify the scale for a decimal field
   583  //
   584  // precision: specify the precision for a decimal field
   585  //
   586  // fieldid: specify the field ID for that node, defaults to -1 which means it is not set in the parquet file.
   587  //
   588  // repetition: specify the repetition as something other than what is determined by the type
   589  //
   590  // converted: specify the Converted Type of the field
   591  //
   592  // logical: specify the logical type of the field, if using decimal then the scale and precision
   593  // will be determined by the precision and scale fields, or by the logical.precision / logical.scale fields
   594  // with the logical. prefixed versions taking precedence. For Time or Timestamp logical types,
   595  // use logical.unit=<millis|micros|nanos> and logical.isadjustedutc=<true|false> to set those. Unit is required
   596  // isadjustedutc defaults to true. For Integer logical type, use logical.bitwidth and logical.signed to specify
   597  // those values, with bitwidth being required, and signed defaulting to true.
   598  //
   599  // All tags other than name can use a prefix of "key<tagname>=<value>" to refer to the type of the key for a map
   600  // and "value<tagname>=<value>" to refer to the value type of a map or the element of a list (such as the type of a slice)
   601  func NewSchemaFromStruct(obj interface{}) (sc *Schema, err error) {
   602  	ot := reflect.TypeOf(obj)
   603  	if ot.Kind() == reflect.Ptr {
   604  		ot = ot.Elem()
   605  	}
   606  
   607  	// typeToNode uses panics to fail fast / fail early instead of propagating
   608  	// errors up recursive stacks. so we recover here and return it as an error
   609  	defer func() {
   610  		if r := recover(); r != nil {
   611  			sc = nil
   612  			switch x := r.(type) {
   613  			case string:
   614  				err = xerrors.New(x)
   615  			case error:
   616  				err = x
   617  			default:
   618  				err = xerrors.New("unknown panic")
   619  			}
   620  		}
   621  	}()
   622  
   623  	root := typeToNode(ot.Name(), ot, parquet.Repetitions.Repeated, nil)
   624  	return NewSchema(root.(*GroupNode)), nil
   625  }
   626  
   627  var parquetTypeToReflect = map[parquet.Type]reflect.Type{
   628  	parquet.Types.Boolean:           reflect.TypeOf(true),
   629  	parquet.Types.Int32:             reflect.TypeOf(int32(0)),
   630  	parquet.Types.Int64:             reflect.TypeOf(int64(0)),
   631  	parquet.Types.Float:             reflect.TypeOf(float32(0)),
   632  	parquet.Types.Double:            reflect.TypeOf(float64(0)),
   633  	parquet.Types.Int96:             reflect.TypeOf(parquet.Int96{}),
   634  	parquet.Types.ByteArray:         reflect.TypeOf(parquet.ByteArray{}),
   635  	parquet.Types.FixedLenByteArray: reflect.TypeOf(parquet.FixedLenByteArray{}),
   636  }
   637  
   638  func typeFromNode(n Node) reflect.Type {
   639  	switch n.Type() {
   640  	case Primitive:
   641  		typ := parquetTypeToReflect[n.(*PrimitiveNode).PhysicalType()]
   642  		// if a bytearray field is annotated as a String logical type or a UTF8 converted type
   643  		// then use a string instead of parquet.ByteArray / parquet.FixedLenByteArray which are []byte
   644  		if n.LogicalType().Equals(StringLogicalType{}) || n.ConvertedType() == ConvertedTypes.UTF8 {
   645  			typ = reflect.TypeOf(string(""))
   646  		}
   647  
   648  		if n.RepetitionType() == parquet.Repetitions.Optional {
   649  			typ = reflect.PtrTo(typ)
   650  		} else if n.RepetitionType() == parquet.Repetitions.Repeated {
   651  			typ = reflect.SliceOf(typ)
   652  		}
   653  
   654  		return typ
   655  	case Group:
   656  		gnode := n.(*GroupNode)
   657  		switch gnode.ConvertedType() {
   658  		case ConvertedTypes.List:
   659  			// According to the Parquet Spec, a list should always be a 3-level structure
   660  			//
   661  			//	<list-repetition> group <name> (LIST) {
   662  			//		repeated group list {
   663  			//			<element-repetition> <element-type> element;
   664  			//		}
   665  			//	}
   666  			//
   667  			// Outer-most level must be a group annotated with LIST containing a single field named "list".
   668  			// this level must be only optional (if the list is nullable) or required
   669  			// Middle level, named list, must be repeated group with a single field named "element"
   670  			// "element" field is the lists element type and repetition, which should be only required or optional
   671  
   672  			if gnode.fields.Len() != 1 {
   673  				panic("invalid list node, should have exactly 1 child.")
   674  			}
   675  
   676  			if gnode.fields[0].RepetitionType() != parquet.Repetitions.Repeated {
   677  				panic("invalid list node, child should be repeated")
   678  			}
   679  
   680  			// it is required that the repeated group of elements is named "list" and it's element
   681  			// field is named "element", however existing data may not use this so readers shouldn't
   682  			// enforce them as errors
   683  			//
   684  			// Rules for backward compatibility from the parquet spec:
   685  			//
   686  			// 1) if the repeated field is not a group, then it's type is the element type and elements
   687  			//    must be required.
   688  			// 2) if the repeated field is a group with multiple fields, then its type is the element type
   689  			//    and elements must be required.
   690  			// 3) if the repeated field is a group with one field AND is named either "array" or uses the
   691  			//    LIST-annotated group's name with "_tuple" suffix, then the repeated type is the element
   692  			//    type and the elements must be required.
   693  			// 4) otherwise, the repeated field's type is the element type with the repeated field's repetition
   694  
   695  			elemMustBeRequired := false
   696  			addSlice := false
   697  			var elemType reflect.Type
   698  			elemNode := gnode.fields[0]
   699  			switch {
   700  			case elemNode.Type() == Primitive,
   701  				elemNode.(*GroupNode).fields.Len() > 1,
   702  				elemNode.(*GroupNode).fields.Len() == 1 && (elemNode.Name() == "array" || elemNode.Name() == gnode.Name()+"_tuple"):
   703  				elemMustBeRequired = true
   704  				elemType = typeFromNode(elemNode)
   705  			default:
   706  				addSlice = true
   707  				elemType = typeFromNode(elemNode.(*GroupNode).fields[0])
   708  			}
   709  
   710  			if elemMustBeRequired && elemType.Kind() == reflect.Ptr {
   711  				elemType = elemType.Elem()
   712  			}
   713  			if addSlice {
   714  				elemType = reflect.SliceOf(elemType)
   715  			}
   716  			if gnode.RepetitionType() == parquet.Repetitions.Optional {
   717  				elemType = reflect.PtrTo(elemType)
   718  			}
   719  			return elemType
   720  		case ConvertedTypes.Map, ConvertedTypes.MapKeyValue:
   721  			// According to the Parquet Spec, the outer-most level should be
   722  			// a group containing a single field named "key_value" with repetition
   723  			// either optional or required for whether or not the map is nullable.
   724  			//
   725  			// The key_value middle level *must* be a repeated group with a "key" field
   726  			// and *optionally* a "value" field
   727  			//
   728  			// the "key" field *must* be required and must always exist
   729  			//
   730  			// the "value" field can be required or optional or omitted.
   731  			//
   732  			// 	<map-repetition> group <name> (MAP) {
   733  			//		repeated group key_value {
   734  			//			required <key-type> key;
   735  			//			<value-repetition> <value-type> value;
   736  			//		}
   737  			//	}
   738  
   739  			if gnode.fields.Len() != 1 {
   740  				panic("invalid map node, should have exactly 1 child")
   741  			}
   742  
   743  			if gnode.fields[0].Type() != Group {
   744  				panic("invalid map node, child should be a group node")
   745  			}
   746  
   747  			// that said, this may not be used in existing data and should not be
   748  			// enforced as errors when reading.
   749  			//
   750  			// some data may also incorrectly use MAP_KEY_VALUE instead of MAP
   751  			//
   752  			// so any group with MAP_KEY_VALUE that is not contained inside of a "MAP"
   753  			// group, should be considered equivalent to being a MAP group itself.
   754  			//
   755  			// in addition, the fields may not be called "key" and "value" in existing
   756  			// data, and as such should not be enforced as errors when reading.
   757  
   758  			keyval := gnode.fields[0].(*GroupNode)
   759  
   760  			keyIndex := keyval.FieldIndexByName("key")
   761  			if keyIndex == -1 {
   762  				keyIndex = 0 // use first child if there is no child named "key"
   763  			}
   764  
   765  			keyType := typeFromNode(keyval.fields[keyIndex])
   766  			if keyType.Kind() == reflect.Ptr {
   767  				keyType = keyType.Elem()
   768  			}
   769  			// can't use a []byte as a key for a map, so use string
   770  			if keyType == reflect.TypeOf(parquet.ByteArray{}) || keyType == reflect.TypeOf(parquet.FixedLenByteArray{}) {
   771  				keyType = reflect.TypeOf(string(""))
   772  			}
   773  
   774  			// if the value node is omitted, then consider this a "set" and make it a
   775  			// map[key-type]bool
   776  			valType := reflect.TypeOf(true)
   777  			if keyval.fields.Len() > 1 {
   778  				valIndex := keyval.FieldIndexByName("value")
   779  				if valIndex == -1 {
   780  					valIndex = 1 // use second child if there is no child named "value"
   781  				}
   782  
   783  				valType = typeFromNode(keyval.fields[valIndex])
   784  			}
   785  
   786  			mapType := reflect.MapOf(keyType, valType)
   787  			if gnode.RepetitionType() == parquet.Repetitions.Optional {
   788  				mapType = reflect.PtrTo(mapType)
   789  			}
   790  			return mapType
   791  		default:
   792  			fields := []reflect.StructField{}
   793  			for _, f := range gnode.fields {
   794  				fields = append(fields, reflect.StructField{
   795  					Name:    f.Name(),
   796  					Type:    typeFromNode(f),
   797  					PkgPath: "parquet",
   798  				})
   799  			}
   800  
   801  			structType := reflect.StructOf(fields)
   802  			if gnode.RepetitionType() == parquet.Repetitions.Repeated {
   803  				return reflect.SliceOf(structType)
   804  			}
   805  			if gnode.RepetitionType() == parquet.Repetitions.Optional {
   806  				return reflect.PtrTo(structType)
   807  			}
   808  			return structType
   809  		}
   810  	}
   811  	panic("what happened?")
   812  }
   813  
   814  // NewStructFromSchema generates a struct type as a reflect.Type from the schema
   815  // by using the appropriate physical types and making things either pointers or slices
   816  // based on whether they are repeated/optional/required. It does not use the logical
   817  // or converted types to change the physical storage so that it is more efficient to use
   818  // the resulting type for reading without having to do conversions.
   819  //
   820  // It will use maps for map types and slices for list types, but otherwise ignores the
   821  // converted and logical types of the nodes. Group nodes that are not List or Map will
   822  // be nested structs.
   823  func NewStructFromSchema(sc *Schema) (t reflect.Type, err error) {
   824  	defer func() {
   825  		if r := recover(); r != nil {
   826  			t = nil
   827  			switch x := r.(type) {
   828  			case string:
   829  				err = xerrors.New(x)
   830  			case error:
   831  				err = x
   832  			default:
   833  				err = xerrors.New("unknown panic")
   834  			}
   835  		}
   836  	}()
   837  
   838  	t = typeFromNode(sc.root)
   839  	if t.Kind() == reflect.Slice || t.Kind() == reflect.Ptr {
   840  		return t.Elem(), nil
   841  	}
   842  	return
   843  }
   844  

View as plain text