// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pqarrow import ( "encoding/binary" "errors" "fmt" "reflect" "sync" "sync/atomic" "time" "unsafe" "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" "github.com/apache/arrow/go/v15/arrow/bitutil" "github.com/apache/arrow/go/v15/arrow/decimal128" "github.com/apache/arrow/go/v15/arrow/decimal256" "github.com/apache/arrow/go/v15/arrow/memory" "github.com/apache/arrow/go/v15/internal/utils" "github.com/apache/arrow/go/v15/parquet" "github.com/apache/arrow/go/v15/parquet/file" "github.com/apache/arrow/go/v15/parquet/schema" "golang.org/x/sync/errgroup" ) // column reader for leaf columns (non-nested) type leafReader struct { out *arrow.Chunked rctx *readerCtx field *arrow.Field input *columnIterator descr *schema.Column recordRdr file.RecordReader props ArrowReadProperties refCount int64 } func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) (*ColumnReader, error) { ret := &leafReader{ rctx: rctx, field: field, input: input, descr: input.Descr(), recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type, rctx.mem, bufferPool), props: props, refCount: 1, } err := ret.nextRowGroup() return &ColumnReader{ret}, err } func (lr *leafReader) Retain() { atomic.AddInt64(&lr.refCount, 1) } func (lr *leafReader) Release() { if atomic.AddInt64(&lr.refCount, -1) == 0 { lr.releaseOut() if lr.recordRdr != nil { lr.recordRdr.Release() lr.recordRdr = nil } } } func (lr *leafReader) GetDefLevels() ([]int16, error) { return lr.recordRdr.DefLevels()[:int(lr.recordRdr.LevelsPos())], nil } func (lr *leafReader) GetRepLevels() ([]int16, error) { return lr.recordRdr.RepLevels()[:int(lr.recordRdr.LevelsPos())], nil } func (lr *leafReader) IsOrHasRepeatedChild() bool { return false } func (lr *leafReader) LoadBatch(nrecords int64) (err error) { lr.releaseOut() lr.recordRdr.Reset() if err := lr.recordRdr.Reserve(nrecords); err != nil { return err } for nrecords > 0 { if !lr.recordRdr.HasMore() { break } numRead, err := lr.recordRdr.ReadRecords(nrecords) if err != nil { return err } nrecords -= numRead if numRead == 0 { if err = lr.nextRowGroup(); err != nil { return err } } } lr.out, err = transferColumnData(lr.recordRdr, lr.field.Type, lr.descr) return } func (lr *leafReader) BuildArray(int64) (*arrow.Chunked, error) { return lr.clearOut(), nil } // releaseOut will clear lr.out as well as release it if it wasn't nil func (lr *leafReader) releaseOut() { if out := lr.clearOut(); out != nil { out.Release() } } // clearOut will clear lt.out and return the old value func (lr *leafReader) clearOut() (out *arrow.Chunked) { out, lr.out = lr.out, nil return out } func (lr *leafReader) Field() *arrow.Field { return lr.field } func (lr *leafReader) nextRowGroup() error { pr, err := lr.input.NextChunk() if err != nil { return err } lr.recordRdr.SetPageReader(pr) return nil } // column reader for struct arrays, has readers for each child which could // themselves be nested or leaf columns. type structReader struct { rctx *readerCtx filtered *arrow.Field levelInfo file.LevelInfo children []*ColumnReader defRepLevelChild *ColumnReader hasRepeatedChild bool props ArrowReadProperties refCount int64 } func (sr *structReader) Retain() { atomic.AddInt64(&sr.refCount, 1) } func (sr *structReader) Release() { if atomic.AddInt64(&sr.refCount, -1) == 0 { if sr.defRepLevelChild != nil { sr.defRepLevelChild.Release() sr.defRepLevelChild = nil } for _, c := range sr.children { c.Release() } sr.children = nil } } func newStructReader(rctx *readerCtx, filtered *arrow.Field, levelInfo file.LevelInfo, children []*ColumnReader, props ArrowReadProperties) *ColumnReader { ret := &structReader{ rctx: rctx, filtered: filtered, levelInfo: levelInfo, children: children, props: props, refCount: 1, } // there could be a mix of children some might be repeated and some might not be // if possible use one that isn't since that will be guaranteed to have the least // number of levels to reconstruct a nullable bitmap for _, child := range children { if !child.IsOrHasRepeatedChild() { ret.defRepLevelChild = child break } } if ret.defRepLevelChild == nil { ret.defRepLevelChild = children[0] ret.hasRepeatedChild = true } ret.defRepLevelChild.Retain() return &ColumnReader{ret} } func (sr *structReader) IsOrHasRepeatedChild() bool { return sr.hasRepeatedChild } func (sr *structReader) GetDefLevels() ([]int16, error) { if len(sr.children) == 0 { return nil, errors.New("struct reader has no children") } // this method should only be called when this struct or one of its parents // are optional/repeated or has a repeated child // meaning all children must have rep/def levels associated with them return sr.defRepLevelChild.GetDefLevels() } func (sr *structReader) GetRepLevels() ([]int16, error) { if len(sr.children) == 0 { return nil, errors.New("struct reader has no children") } // this method should only be called when this struct or one of its parents // are optional/repeated or has a repeated child // meaning all children must have rep/def levels associated with them return sr.defRepLevelChild.GetRepLevels() } func (sr *structReader) LoadBatch(nrecords int64) error { // Load batches in parallel // When reading structs with large numbers of columns, the serial load is very slow. // This is especially true when reading Cloud Storage. Loading concurrently // greatly improves performance. g := new(errgroup.Group) if !sr.props.Parallel { g.SetLimit(1) } for _, rdr := range sr.children { rdr := rdr g.Go(func() error { return rdr.LoadBatch(nrecords) }) } return g.Wait() } func (sr *structReader) Field() *arrow.Field { return sr.filtered } func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) { validityIO := file.ValidityBitmapInputOutput{ ReadUpperBound: lenBound, Read: lenBound, } var nullBitmap *memory.Buffer if lenBound > 0 && (sr.hasRepeatedChild || sr.filtered.Nullable) { nullBitmap = memory.NewResizableBuffer(sr.rctx.mem) nullBitmap.Resize(int(bitutil.BytesForBits(lenBound))) defer nullBitmap.Release() validityIO.ValidBits = nullBitmap.Bytes() defLevels, err := sr.GetDefLevels() if err != nil { return nil, err } if sr.hasRepeatedChild { repLevels, err := sr.GetRepLevels() if err != nil { return nil, err } if err := file.DefRepLevelsToBitmap(defLevels, repLevels, sr.levelInfo, &validityIO); err != nil { return nil, err } } else { file.DefLevelsToBitmap(defLevels, sr.levelInfo, &validityIO) } } if nullBitmap != nil { nullBitmap.Resize(int(bitutil.BytesForBits(validityIO.Read))) } childArrData := make([]arrow.ArrayData, len(sr.children)) defer releaseArrayData(childArrData) // gather children arrays and def levels for i, child := range sr.children { field, err := child.BuildArray(lenBound) if err != nil { return nil, err } childArrData[i], err = chunksToSingle(field) field.Release() // release field before checking if err != nil { return nil, err } } if !sr.filtered.Nullable && !sr.hasRepeatedChild { validityIO.Read = int64(childArrData[0].Len()) } buffers := make([]*memory.Buffer, 1) if validityIO.NullCount > 0 { buffers[0] = nullBitmap } data := array.NewData(sr.filtered.Type, int(validityIO.Read), buffers, childArrData, int(validityIO.NullCount), 0) defer data.Release() arr := array.NewStructData(data) defer arr.Release() return arrow.NewChunked(sr.filtered.Type, []arrow.Array{arr}), nil } // column reader for repeated columns specifically for list arrays type listReader struct { rctx *readerCtx field *arrow.Field info file.LevelInfo itemRdr *ColumnReader props ArrowReadProperties refCount int64 } func newListReader(rctx *readerCtx, field *arrow.Field, info file.LevelInfo, childRdr *ColumnReader, props ArrowReadProperties) *ColumnReader { childRdr.Retain() return &ColumnReader{&listReader{rctx, field, info, childRdr, props, 1}} } func (lr *listReader) Retain() { atomic.AddInt64(&lr.refCount, 1) } func (lr *listReader) Release() { if atomic.AddInt64(&lr.refCount, -1) == 0 { if lr.itemRdr != nil { lr.itemRdr.Release() lr.itemRdr = nil } } } func (lr *listReader) GetDefLevels() ([]int16, error) { return lr.itemRdr.GetDefLevels() } func (lr *listReader) GetRepLevels() ([]int16, error) { return lr.itemRdr.GetRepLevels() } func (lr *listReader) Field() *arrow.Field { return lr.field } func (lr *listReader) IsOrHasRepeatedChild() bool { return true } func (lr *listReader) LoadBatch(nrecords int64) error { return lr.itemRdr.LoadBatch(nrecords) } func (lr *listReader) BuildArray(lenBound int64) (*arrow.Chunked, error) { var ( defLevels []int16 repLevels []int16 err error validityBuffer *memory.Buffer ) if defLevels, err = lr.itemRdr.GetDefLevels(); err != nil { return nil, err } if repLevels, err = lr.itemRdr.GetRepLevels(); err != nil { return nil, err } validityIO := file.ValidityBitmapInputOutput{ReadUpperBound: lenBound} if lr.field.Nullable { validityBuffer = memory.NewResizableBuffer(lr.rctx.mem) validityBuffer.Resize(int(bitutil.BytesForBits(lenBound))) defer validityBuffer.Release() validityIO.ValidBits = validityBuffer.Bytes() } offsetsBuffer := memory.NewResizableBuffer(lr.rctx.mem) offsetsBuffer.Resize(arrow.Int32Traits.BytesRequired(int(lenBound) + 1)) defer offsetsBuffer.Release() offsetData := arrow.Int32Traits.CastFromBytes(offsetsBuffer.Bytes()) if err = file.DefRepLevelsToListInfo(defLevels, repLevels, lr.info, &validityIO, offsetData); err != nil { return nil, err } // if the parent (itemRdr) has nulls and is a nested type like list // then we need BuildArray to account for that with the number of // definition levels when building out the bitmap. So the upper bound // to make sure we have the space for is the worst case scenario, // the upper bound is the value of the last offset + the nullcount arr, err := lr.itemRdr.BuildArray(int64(offsetData[int(validityIO.Read)]) + validityIO.NullCount) if err != nil { return nil, err } defer arr.Release() // resize to actual number of elems returned offsetsBuffer.Resize(arrow.Int32Traits.BytesRequired(int(validityIO.Read) + 1)) if validityBuffer != nil { validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read))) } item, err := chunksToSingle(arr) if err != nil { return nil, err } defer item.Release() buffers := []*memory.Buffer{nil, offsetsBuffer} if validityIO.NullCount > 0 { buffers[0] = validityBuffer } data := array.NewData(lr.field.Type, int(validityIO.Read), buffers, []arrow.ArrayData{item}, int(validityIO.NullCount), 0) defer data.Release() if lr.field.Type.ID() == arrow.FIXED_SIZE_LIST { defer data.Buffers()[1].Release() listSize := lr.field.Type.(*arrow.FixedSizeListType).Len() for x := 1; x < data.Len(); x++ { size := offsetData[x] - offsetData[x-1] if size != listSize { return nil, fmt.Errorf("expected all lists to be of size=%d, but index %d had size=%d", listSize, x, size) } } data.Buffers()[1] = nil } out := array.MakeFromData(data) defer out.Release() return arrow.NewChunked(lr.field.Type, []arrow.Array{out}), nil } // column reader logic for fixed size lists instead of variable length ones. type fixedSizeListReader struct { listReader } func newFixedSizeListReader(rctx *readerCtx, field *arrow.Field, info file.LevelInfo, childRdr *ColumnReader, props ArrowReadProperties) *ColumnReader { childRdr.Retain() return &ColumnReader{&fixedSizeListReader{listReader{rctx, field, info, childRdr, props, 1}}} } // helper function to combine chunks into a single array. // // nested data conversion for chunked array outputs not yet implemented func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) { switch len(chunked.Chunks()) { case 0: return array.NewData(chunked.DataType(), 0, []*memory.Buffer{nil, nil}, nil, 0, 0), nil case 1: data := chunked.Chunk(0).Data() data.Retain() // we pass control to the caller return data, nil default: // if an item reader yields a chunked array, this is not yet implemented return nil, arrow.ErrNotImplemented } } // create a chunked arrow array from the raw record data func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr *schema.Column) (*arrow.Chunked, error) { dt := valueType if valueType.ID() == arrow.EXTENSION { dt = valueType.(arrow.ExtensionType).StorageType() } var data arrow.ArrayData switch dt.ID() { case arrow.DICTIONARY: return transferDictionary(rdr, valueType), nil case arrow.NULL: return arrow.NewChunked(arrow.Null, []arrow.Array{array.NewNull(rdr.ValuesWritten())}), nil case arrow.INT32, arrow.INT64, arrow.FLOAT32, arrow.FLOAT64: data = transferZeroCopy(rdr, valueType) // can just reference the raw data without copying case arrow.BOOL: data = transferBool(rdr) case arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64, arrow.INT8, arrow.INT16, arrow.DATE32, arrow.TIME32, arrow.TIME64: data = transferInt(rdr, valueType) case arrow.DATE64: data = transferDate64(rdr, valueType) case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING, arrow.LARGE_BINARY, arrow.LARGE_STRING: return transferBinary(rdr, valueType), nil case arrow.DECIMAL, arrow.DECIMAL256: switch descr.PhysicalType() { case parquet.Types.Int32, parquet.Types.Int64: data = transferDecimalInteger(rdr, valueType) case parquet.Types.ByteArray, parquet.Types.FixedLenByteArray: return transferDecimalBytes(rdr.(file.BinaryRecordReader), valueType) default: return nil, errors.New("physical type for decimal128/decimal256 must be int32, int64, bytearray or fixed len byte array") } case arrow.TIMESTAMP: tstype := valueType.(*arrow.TimestampType) switch tstype.Unit { case arrow.Millisecond, arrow.Microsecond: data = transferZeroCopy(rdr, valueType) case arrow.Nanosecond: if descr.PhysicalType() == parquet.Types.Int96 { data = transferInt96(rdr, valueType) } else { data = transferZeroCopy(rdr, valueType) } default: return nil, errors.New("time unit not supported") } case arrow.FLOAT16: if descr.PhysicalType() != parquet.Types.FixedLenByteArray { return nil, errors.New("physical type for float16 must be fixed len byte array") } if len := arrow.Float16SizeBytes; descr.TypeLength() != len { return nil, fmt.Errorf("fixed len byte array length for float16 must be %d", len) } return transferBinary(rdr, valueType), nil default: return nil, fmt.Errorf("no support for reading columns of type: %s", valueType.Name()) } defer data.Release() arr := array.MakeFromData(data) defer arr.Release() return arrow.NewChunked(valueType, []arrow.Array{arr}), nil } func transferZeroCopy(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { bitmap := rdr.ReleaseValidBits() values := rdr.ReleaseValues() defer func() { if bitmap != nil { bitmap.Release() } if values != nil { values.Release() } }() return array.NewData(dt, rdr.ValuesWritten(), []*memory.Buffer{bitmap, values}, nil, int(rdr.NullCount()), 0) } func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked { brdr := rdr.(file.BinaryRecordReader) if brdr.ReadDictionary() { return transferDictionary(brdr, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: dt}) } chunks := brdr.GetBuilderChunks() defer releaseArrays(chunks) switch dt := dt.(type) { case arrow.ExtensionType: for idx, chunk := range chunks { chunks[idx] = array.NewExtensionArrayWithStorage(dt, chunk) chunk.Release() } case *arrow.StringType, *arrow.LargeStringType: for idx, chunk := range chunks { chunks[idx] = array.MakeFromData(chunk.Data()) chunk.Release() } case *arrow.Float16Type: for idx, chunk := range chunks { data := chunk.Data() f16_data := array.NewData(dt, data.Len(), data.Buffers(), nil, data.NullN(), data.Offset()) defer f16_data.Release() chunks[idx] = array.NewFloat16Data(f16_data) chunk.Release() } } return arrow.NewChunked(dt, chunks) } func transferInt(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { var ( output reflect.Value ) signed := true // create buffer for proper type since parquet only has int32 and int64 // physical representations, but we want the correct type representation // for Arrow's in memory buffer. data := make([]byte, rdr.ValuesWritten()*int(bitutil.BytesForBits(int64(dt.(arrow.FixedWidthDataType).BitWidth())))) switch dt.ID() { case arrow.INT8: output = reflect.ValueOf(arrow.Int8Traits.CastFromBytes(data)) case arrow.UINT8: signed = false output = reflect.ValueOf(arrow.Uint8Traits.CastFromBytes(data)) case arrow.INT16: output = reflect.ValueOf(arrow.Int16Traits.CastFromBytes(data)) case arrow.UINT16: signed = false output = reflect.ValueOf(arrow.Uint16Traits.CastFromBytes(data)) case arrow.UINT32: signed = false output = reflect.ValueOf(arrow.Uint32Traits.CastFromBytes(data)) case arrow.UINT64: signed = false output = reflect.ValueOf(arrow.Uint64Traits.CastFromBytes(data)) case arrow.DATE32: output = reflect.ValueOf(arrow.Date32Traits.CastFromBytes(data)) case arrow.TIME32: output = reflect.ValueOf(arrow.Time32Traits.CastFromBytes(data)) case arrow.TIME64: output = reflect.ValueOf(arrow.Time64Traits.CastFromBytes(data)) } length := rdr.ValuesWritten() // copy the values semantically with the correct types switch rdr.Type() { case parquet.Types.Int32: values := arrow.Int32Traits.CastFromBytes(rdr.Values()) if signed { for idx, v := range values[:length] { output.Index(idx).SetInt(int64(v)) } } else { for idx, v := range values[:length] { output.Index(idx).SetUint(uint64(v)) } } case parquet.Types.Int64: values := arrow.Int64Traits.CastFromBytes(rdr.Values()) if signed { for idx, v := range values[:length] { output.Index(idx).SetInt(v) } } else { for idx, v := range values[:length] { output.Index(idx).SetUint(uint64(v)) } } } bitmap := rdr.ReleaseValidBits() if bitmap != nil { defer bitmap.Release() } return array.NewData(dt, rdr.ValuesWritten(), []*memory.Buffer{ bitmap, memory.NewBufferBytes(data), }, nil, int(rdr.NullCount()), 0) } func transferBool(rdr file.RecordReader) arrow.ArrayData { // TODO(mtopol): optimize this so we don't convert bitmap to []bool back to bitmap length := rdr.ValuesWritten() data := make([]byte, int(bitutil.BytesForBits(int64(length)))) bytedata := rdr.Values() values := *(*[]bool)(unsafe.Pointer(&bytedata)) for idx, v := range values[:length] { if v { bitutil.SetBit(data, idx) } } bitmap := rdr.ReleaseValidBits() if bitmap != nil { defer bitmap.Release() } bb := memory.NewBufferBytes(data) defer bb.Release() return array.NewData(&arrow.BooleanType{}, length, []*memory.Buffer{ bitmap, bb, }, nil, int(rdr.NullCount()), 0) } var milliPerDay = time.Duration(24 * time.Hour).Milliseconds() // parquet equivalent for date64 is a 32-bit integer of the number of days // since the epoch. Convert each value to milliseconds for date64 func transferDate64(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { length := rdr.ValuesWritten() values := arrow.Int32Traits.CastFromBytes(rdr.Values()) data := make([]byte, arrow.Int64Traits.BytesRequired(length)) out := arrow.Int64Traits.CastFromBytes(data) for idx, val := range values[:length] { out[idx] = int64(val) * milliPerDay } bitmap := rdr.ReleaseValidBits() if bitmap != nil { defer bitmap.Release() } return array.NewData(dt, length, []*memory.Buffer{ bitmap, memory.NewBufferBytes(data), }, nil, int(rdr.NullCount()), 0) } // coerce int96 to nanosecond timestamp func transferInt96(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { length := rdr.ValuesWritten() values := parquet.Int96Traits.CastFromBytes(rdr.Values()) data := make([]byte, arrow.Int64SizeBytes*length) out := arrow.Int64Traits.CastFromBytes(data) for idx, val := range values[:length] { if binary.LittleEndian.Uint32(val[8:]) == 0 { out[idx] = 0 } else { out[idx] = val.ToTime().UnixNano() } } bitmap := rdr.ReleaseValidBits() if bitmap != nil { defer bitmap.Release() } return array.NewData(dt, length, []*memory.Buffer{ bitmap, memory.NewBufferBytes(data), }, nil, int(rdr.NullCount()), 0) } // convert physical integer storage of a decimal logical type to a decimal128 typed array func transferDecimalInteger(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData { length := rdr.ValuesWritten() var values reflect.Value switch rdr.Type() { case parquet.Types.Int32: values = reflect.ValueOf(arrow.Int32Traits.CastFromBytes(rdr.Values())[:length]) case parquet.Types.Int64: values = reflect.ValueOf(arrow.Int64Traits.CastFromBytes(rdr.Values())[:length]) } var data []byte switch dt.ID() { case arrow.DECIMAL128: data = make([]byte, arrow.Decimal128Traits.BytesRequired(length)) out := arrow.Decimal128Traits.CastFromBytes(data) for i := 0; i < values.Len(); i++ { out[i] = decimal128.FromI64(values.Index(i).Int()) } case arrow.DECIMAL256: data = make([]byte, arrow.Decimal256Traits.BytesRequired(length)) out := arrow.Decimal256Traits.CastFromBytes(data) for i := 0; i < values.Len(); i++ { out[i] = decimal256.FromI64(values.Index(i).Int()) } } var nullmap *memory.Buffer if rdr.NullCount() > 0 { nullmap = rdr.ReleaseValidBits() defer nullmap.Release() } return array.NewData(dt, length, []*memory.Buffer{ nullmap, memory.NewBufferBytes(data), }, nil, int(rdr.NullCount()), 0) } func uint64FromBigEndianShifted(buf []byte) uint64 { var ( bytes [8]byte ) copy(bytes[8-len(buf):], buf) return binary.BigEndian.Uint64(bytes[:]) } // parquet's defined encoding for decimal data is for it to be written as big // endian bytes, so convert a bit endian byte order to a decimal128 func bigEndianToDecimal128(buf []byte) (decimal128.Num, error) { const ( minDecimalBytes = 1 maxDecimalBytes = 16 ) if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes { return decimal128.Num{}, fmt.Errorf("length of byte array passed to bigEndianToDecimal128 was %d but must be between %d and %d", len(buf), minDecimalBytes, maxDecimalBytes) } // bytes are big endian so first byte is MSB and holds the sign bit isNeg := int8(buf[0]) < 0 // 1. extract high bits highBitsOffset := utils.Max(0, len(buf)-8) var ( highBits uint64 lowBits uint64 hi int64 lo int64 ) highBits = uint64FromBigEndianShifted(buf[:highBitsOffset]) if highBitsOffset == 8 { hi = int64(highBits) } else { if isNeg && len(buf) < maxDecimalBytes { hi = -1 } hi = int64(uint64(hi) << (uint64(highBitsOffset) * 8)) hi |= int64(highBits) } // 2. extract lower bits lowBitsOffset := utils.Min(len(buf), 8) lowBits = uint64FromBigEndianShifted(buf[highBitsOffset:]) if lowBitsOffset == 8 { lo = int64(lowBits) } else { if isNeg && len(buf) < 8 { lo = -1 } lo = int64(uint64(lo) << (uint64(lowBitsOffset) * 8)) lo |= int64(lowBits) } return decimal128.New(hi, uint64(lo)), nil } func bigEndianToDecimal256(buf []byte) (decimal256.Num, error) { const ( minDecimalBytes = 1 maxDecimalBytes = 32 ) if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes { return decimal256.Num{}, fmt.Errorf("%w: length of byte array for bigEndianToDecimal256 was %d but must be between %d and %d", arrow.ErrInvalid, len(buf), minDecimalBytes, maxDecimalBytes) } var littleEndian [4]uint64 // bytes are coming in big-endian, so the first byte is the MSB and // therefore holds the sign bit initWord, isNeg := uint64(0), int8(buf[0]) < 0 if isNeg { // sign extend if necessary initWord = uint64(0xFFFFFFFFFFFFFFFF) } for wordIdx := 0; wordIdx < 4; wordIdx++ { wordLen := utils.Min(len(buf), arrow.Uint64SizeBytes) word := buf[len(buf)-wordLen:] if wordLen == 8 { // full words can be assigned as-is littleEndian[wordIdx] = binary.BigEndian.Uint64(word) } else { result := initWord if len(buf) > 0 { // incorporate the actual values if present // shift left enough bits to make room for the incoming int64 result = result << uint64(wordLen) // preserve the upper bits by inplace OR-ing the int64 result |= uint64FromBigEndianShifted(word) } littleEndian[wordIdx] = result } buf = buf[:len(buf)-wordLen] } return decimal256.New(littleEndian[3], littleEndian[2], littleEndian[1], littleEndian[0]), nil } type varOrFixedBin interface { arrow.Array Value(i int) []byte } // convert physical byte storage, instead of integers, to decimal128 func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arrow.Chunked, error) { convert128 := func(in varOrFixedBin) (arrow.Array, error) { length := in.Len() data := make([]byte, arrow.Decimal128Traits.BytesRequired(length)) out := arrow.Decimal128Traits.CastFromBytes(data) nullCount := in.NullN() var err error for i := 0; i < length; i++ { if nullCount > 0 && in.IsNull(i) { continue } rec := in.Value(i) if len(rec) <= 0 { return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt) } out[i], err = bigEndianToDecimal128(rec) if err != nil { return nil, err } } ret := array.NewData(dt, length, []*memory.Buffer{ in.Data().Buffers()[0], memory.NewBufferBytes(data), }, nil, nullCount, 0) defer ret.Release() return array.MakeFromData(ret), nil } convert256 := func(in varOrFixedBin) (arrow.Array, error) { length := in.Len() data := make([]byte, arrow.Decimal256Traits.BytesRequired(length)) out := arrow.Decimal256Traits.CastFromBytes(data) nullCount := in.NullN() var err error for i := 0; i < length; i++ { if nullCount > 0 && in.IsNull(i) { continue } rec := in.Value(i) if len(rec) <= 0 { return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt) } out[i], err = bigEndianToDecimal256(rec) if err != nil { return nil, err } } ret := array.NewData(dt, length, []*memory.Buffer{ in.Data().Buffers()[0], memory.NewBufferBytes(data), }, nil, nullCount, 0) defer ret.Release() return array.MakeFromData(ret), nil } convert := func(arr arrow.Array) (arrow.Array, error) { switch dt.ID() { case arrow.DECIMAL128: return convert128(arr.(varOrFixedBin)) case arrow.DECIMAL256: return convert256(arr.(varOrFixedBin)) } return nil, arrow.ErrNotImplemented } chunks := rdr.GetBuilderChunks() var err error for idx, chunk := range chunks { defer chunk.Release() if chunks[idx], err = convert(chunk); err != nil { return nil, err } defer chunks[idx].Release() } return arrow.NewChunked(dt, chunks), nil } func transferDictionary(rdr file.RecordReader, logicalValueType arrow.DataType) *arrow.Chunked { brdr := rdr.(file.BinaryRecordReader) chunks := brdr.GetBuilderChunks() defer releaseArrays(chunks) return arrow.NewChunked(logicalValueType, chunks) }