1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pqarrow
18
19 import (
20 "encoding/binary"
21 "errors"
22 "fmt"
23 "reflect"
24 "sync"
25 "sync/atomic"
26 "time"
27 "unsafe"
28
29 "github.com/apache/arrow/go/v15/arrow"
30 "github.com/apache/arrow/go/v15/arrow/array"
31 "github.com/apache/arrow/go/v15/arrow/bitutil"
32 "github.com/apache/arrow/go/v15/arrow/decimal128"
33 "github.com/apache/arrow/go/v15/arrow/decimal256"
34 "github.com/apache/arrow/go/v15/arrow/memory"
35 "github.com/apache/arrow/go/v15/internal/utils"
36 "github.com/apache/arrow/go/v15/parquet"
37 "github.com/apache/arrow/go/v15/parquet/file"
38 "github.com/apache/arrow/go/v15/parquet/schema"
39 "golang.org/x/sync/errgroup"
40 )
41
42
43 type leafReader struct {
44 out *arrow.Chunked
45 rctx *readerCtx
46 field *arrow.Field
47 input *columnIterator
48 descr *schema.Column
49 recordRdr file.RecordReader
50 props ArrowReadProperties
51
52 refCount int64
53 }
54
55 func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) (*ColumnReader, error) {
56 ret := &leafReader{
57 rctx: rctx,
58 field: field,
59 input: input,
60 descr: input.Descr(),
61 recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type, rctx.mem, bufferPool),
62 props: props,
63 refCount: 1,
64 }
65 err := ret.nextRowGroup()
66 return &ColumnReader{ret}, err
67 }
68
69 func (lr *leafReader) Retain() {
70 atomic.AddInt64(&lr.refCount, 1)
71 }
72
73 func (lr *leafReader) Release() {
74 if atomic.AddInt64(&lr.refCount, -1) == 0 {
75 lr.releaseOut()
76 if lr.recordRdr != nil {
77 lr.recordRdr.Release()
78 lr.recordRdr = nil
79 }
80 }
81 }
82
83 func (lr *leafReader) GetDefLevels() ([]int16, error) {
84 return lr.recordRdr.DefLevels()[:int(lr.recordRdr.LevelsPos())], nil
85 }
86
87 func (lr *leafReader) GetRepLevels() ([]int16, error) {
88 return lr.recordRdr.RepLevels()[:int(lr.recordRdr.LevelsPos())], nil
89 }
90
91 func (lr *leafReader) IsOrHasRepeatedChild() bool { return false }
92
93 func (lr *leafReader) LoadBatch(nrecords int64) (err error) {
94 lr.releaseOut()
95 lr.recordRdr.Reset()
96
97 if err := lr.recordRdr.Reserve(nrecords); err != nil {
98 return err
99 }
100 for nrecords > 0 {
101 if !lr.recordRdr.HasMore() {
102 break
103 }
104 numRead, err := lr.recordRdr.ReadRecords(nrecords)
105 if err != nil {
106 return err
107 }
108 nrecords -= numRead
109 if numRead == 0 {
110 if err = lr.nextRowGroup(); err != nil {
111 return err
112 }
113 }
114 }
115 lr.out, err = transferColumnData(lr.recordRdr, lr.field.Type, lr.descr)
116 return
117 }
118
119 func (lr *leafReader) BuildArray(int64) (*arrow.Chunked, error) {
120 return lr.clearOut(), nil
121 }
122
123
124 func (lr *leafReader) releaseOut() {
125 if out := lr.clearOut(); out != nil {
126 out.Release()
127 }
128 }
129
130
131 func (lr *leafReader) clearOut() (out *arrow.Chunked) {
132 out, lr.out = lr.out, nil
133 return out
134 }
135
136 func (lr *leafReader) Field() *arrow.Field { return lr.field }
137
138 func (lr *leafReader) nextRowGroup() error {
139 pr, err := lr.input.NextChunk()
140 if err != nil {
141 return err
142 }
143 lr.recordRdr.SetPageReader(pr)
144 return nil
145 }
146
147
148
149 type structReader struct {
150 rctx *readerCtx
151 filtered *arrow.Field
152 levelInfo file.LevelInfo
153 children []*ColumnReader
154 defRepLevelChild *ColumnReader
155 hasRepeatedChild bool
156 props ArrowReadProperties
157
158 refCount int64
159 }
160
161 func (sr *structReader) Retain() {
162 atomic.AddInt64(&sr.refCount, 1)
163 }
164
165 func (sr *structReader) Release() {
166 if atomic.AddInt64(&sr.refCount, -1) == 0 {
167 if sr.defRepLevelChild != nil {
168 sr.defRepLevelChild.Release()
169 sr.defRepLevelChild = nil
170 }
171 for _, c := range sr.children {
172 c.Release()
173 }
174 sr.children = nil
175 }
176 }
177
178 func newStructReader(rctx *readerCtx, filtered *arrow.Field, levelInfo file.LevelInfo, children []*ColumnReader, props ArrowReadProperties) *ColumnReader {
179 ret := &structReader{
180 rctx: rctx,
181 filtered: filtered,
182 levelInfo: levelInfo,
183 children: children,
184 props: props,
185 refCount: 1,
186 }
187
188
189
190
191 for _, child := range children {
192 if !child.IsOrHasRepeatedChild() {
193 ret.defRepLevelChild = child
194 break
195 }
196 }
197
198 if ret.defRepLevelChild == nil {
199 ret.defRepLevelChild = children[0]
200 ret.hasRepeatedChild = true
201 }
202 ret.defRepLevelChild.Retain()
203 return &ColumnReader{ret}
204 }
205
206 func (sr *structReader) IsOrHasRepeatedChild() bool { return sr.hasRepeatedChild }
207
208 func (sr *structReader) GetDefLevels() ([]int16, error) {
209 if len(sr.children) == 0 {
210 return nil, errors.New("struct reader has no children")
211 }
212
213
214
215
216 return sr.defRepLevelChild.GetDefLevels()
217 }
218
219 func (sr *structReader) GetRepLevels() ([]int16, error) {
220 if len(sr.children) == 0 {
221 return nil, errors.New("struct reader has no children")
222 }
223
224
225
226
227 return sr.defRepLevelChild.GetRepLevels()
228 }
229
230 func (sr *structReader) LoadBatch(nrecords int64) error {
231
232
233
234
235 g := new(errgroup.Group)
236 if !sr.props.Parallel {
237 g.SetLimit(1)
238 }
239 for _, rdr := range sr.children {
240 rdr := rdr
241 g.Go(func() error {
242 return rdr.LoadBatch(nrecords)
243 })
244 }
245
246 return g.Wait()
247 }
248
249 func (sr *structReader) Field() *arrow.Field { return sr.filtered }
250
251 func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
252 validityIO := file.ValidityBitmapInputOutput{
253 ReadUpperBound: lenBound,
254 Read: lenBound,
255 }
256
257 var nullBitmap *memory.Buffer
258
259 if lenBound > 0 && (sr.hasRepeatedChild || sr.filtered.Nullable) {
260 nullBitmap = memory.NewResizableBuffer(sr.rctx.mem)
261 nullBitmap.Resize(int(bitutil.BytesForBits(lenBound)))
262 defer nullBitmap.Release()
263 validityIO.ValidBits = nullBitmap.Bytes()
264 defLevels, err := sr.GetDefLevels()
265 if err != nil {
266 return nil, err
267 }
268
269 if sr.hasRepeatedChild {
270 repLevels, err := sr.GetRepLevels()
271 if err != nil {
272 return nil, err
273 }
274
275 if err := file.DefRepLevelsToBitmap(defLevels, repLevels, sr.levelInfo, &validityIO); err != nil {
276 return nil, err
277 }
278 } else {
279 file.DefLevelsToBitmap(defLevels, sr.levelInfo, &validityIO)
280 }
281 }
282
283 if nullBitmap != nil {
284 nullBitmap.Resize(int(bitutil.BytesForBits(validityIO.Read)))
285 }
286
287 childArrData := make([]arrow.ArrayData, len(sr.children))
288 defer releaseArrayData(childArrData)
289
290 for i, child := range sr.children {
291 field, err := child.BuildArray(lenBound)
292 if err != nil {
293 return nil, err
294 }
295
296 childArrData[i], err = chunksToSingle(field)
297 field.Release()
298 if err != nil {
299 return nil, err
300 }
301 }
302
303 if !sr.filtered.Nullable && !sr.hasRepeatedChild {
304 validityIO.Read = int64(childArrData[0].Len())
305 }
306
307 buffers := make([]*memory.Buffer, 1)
308 if validityIO.NullCount > 0 {
309 buffers[0] = nullBitmap
310 }
311
312 data := array.NewData(sr.filtered.Type, int(validityIO.Read), buffers, childArrData, int(validityIO.NullCount), 0)
313 defer data.Release()
314 arr := array.NewStructData(data)
315 defer arr.Release()
316 return arrow.NewChunked(sr.filtered.Type, []arrow.Array{arr}), nil
317 }
318
319
320 type listReader struct {
321 rctx *readerCtx
322 field *arrow.Field
323 info file.LevelInfo
324 itemRdr *ColumnReader
325 props ArrowReadProperties
326 refCount int64
327 }
328
329 func newListReader(rctx *readerCtx, field *arrow.Field, info file.LevelInfo, childRdr *ColumnReader, props ArrowReadProperties) *ColumnReader {
330 childRdr.Retain()
331 return &ColumnReader{&listReader{rctx, field, info, childRdr, props, 1}}
332 }
333
334 func (lr *listReader) Retain() {
335 atomic.AddInt64(&lr.refCount, 1)
336 }
337
338 func (lr *listReader) Release() {
339 if atomic.AddInt64(&lr.refCount, -1) == 0 {
340 if lr.itemRdr != nil {
341 lr.itemRdr.Release()
342 lr.itemRdr = nil
343 }
344 }
345 }
346
347 func (lr *listReader) GetDefLevels() ([]int16, error) {
348 return lr.itemRdr.GetDefLevels()
349 }
350
351 func (lr *listReader) GetRepLevels() ([]int16, error) {
352 return lr.itemRdr.GetRepLevels()
353 }
354
355 func (lr *listReader) Field() *arrow.Field { return lr.field }
356
357 func (lr *listReader) IsOrHasRepeatedChild() bool { return true }
358
359 func (lr *listReader) LoadBatch(nrecords int64) error {
360 return lr.itemRdr.LoadBatch(nrecords)
361 }
362
363 func (lr *listReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
364 var (
365 defLevels []int16
366 repLevels []int16
367 err error
368 validityBuffer *memory.Buffer
369 )
370
371 if defLevels, err = lr.itemRdr.GetDefLevels(); err != nil {
372 return nil, err
373 }
374 if repLevels, err = lr.itemRdr.GetRepLevels(); err != nil {
375 return nil, err
376 }
377
378 validityIO := file.ValidityBitmapInputOutput{ReadUpperBound: lenBound}
379 if lr.field.Nullable {
380 validityBuffer = memory.NewResizableBuffer(lr.rctx.mem)
381 validityBuffer.Resize(int(bitutil.BytesForBits(lenBound)))
382 defer validityBuffer.Release()
383 validityIO.ValidBits = validityBuffer.Bytes()
384 }
385 offsetsBuffer := memory.NewResizableBuffer(lr.rctx.mem)
386 offsetsBuffer.Resize(arrow.Int32Traits.BytesRequired(int(lenBound) + 1))
387 defer offsetsBuffer.Release()
388
389 offsetData := arrow.Int32Traits.CastFromBytes(offsetsBuffer.Bytes())
390 if err = file.DefRepLevelsToListInfo(defLevels, repLevels, lr.info, &validityIO, offsetData); err != nil {
391 return nil, err
392 }
393
394
395
396
397
398
399 arr, err := lr.itemRdr.BuildArray(int64(offsetData[int(validityIO.Read)]) + validityIO.NullCount)
400 if err != nil {
401 return nil, err
402 }
403 defer arr.Release()
404
405
406 offsetsBuffer.Resize(arrow.Int32Traits.BytesRequired(int(validityIO.Read) + 1))
407 if validityBuffer != nil {
408 validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read)))
409 }
410
411 item, err := chunksToSingle(arr)
412 if err != nil {
413 return nil, err
414 }
415 defer item.Release()
416
417 buffers := []*memory.Buffer{nil, offsetsBuffer}
418 if validityIO.NullCount > 0 {
419 buffers[0] = validityBuffer
420 }
421
422 data := array.NewData(lr.field.Type, int(validityIO.Read), buffers, []arrow.ArrayData{item}, int(validityIO.NullCount), 0)
423 defer data.Release()
424 if lr.field.Type.ID() == arrow.FIXED_SIZE_LIST {
425 defer data.Buffers()[1].Release()
426 listSize := lr.field.Type.(*arrow.FixedSizeListType).Len()
427 for x := 1; x < data.Len(); x++ {
428 size := offsetData[x] - offsetData[x-1]
429 if size != listSize {
430 return nil, fmt.Errorf("expected all lists to be of size=%d, but index %d had size=%d", listSize, x, size)
431 }
432 }
433 data.Buffers()[1] = nil
434 }
435 out := array.MakeFromData(data)
436 defer out.Release()
437 return arrow.NewChunked(lr.field.Type, []arrow.Array{out}), nil
438 }
439
440
441 type fixedSizeListReader struct {
442 listReader
443 }
444
445 func newFixedSizeListReader(rctx *readerCtx, field *arrow.Field, info file.LevelInfo, childRdr *ColumnReader, props ArrowReadProperties) *ColumnReader {
446 childRdr.Retain()
447 return &ColumnReader{&fixedSizeListReader{listReader{rctx, field, info, childRdr, props, 1}}}
448 }
449
450
451
452
453 func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) {
454 switch len(chunked.Chunks()) {
455 case 0:
456 return array.NewData(chunked.DataType(), 0, []*memory.Buffer{nil, nil}, nil, 0, 0), nil
457 case 1:
458 data := chunked.Chunk(0).Data()
459 data.Retain()
460 return data, nil
461 default:
462 return nil, arrow.ErrNotImplemented
463 }
464 }
465
466
467 func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr *schema.Column) (*arrow.Chunked, error) {
468 dt := valueType
469 if valueType.ID() == arrow.EXTENSION {
470 dt = valueType.(arrow.ExtensionType).StorageType()
471 }
472
473 var data arrow.ArrayData
474 switch dt.ID() {
475 case arrow.DICTIONARY:
476 return transferDictionary(rdr, valueType), nil
477 case arrow.NULL:
478 return arrow.NewChunked(arrow.Null, []arrow.Array{array.NewNull(rdr.ValuesWritten())}), nil
479 case arrow.INT32, arrow.INT64, arrow.FLOAT32, arrow.FLOAT64:
480 data = transferZeroCopy(rdr, valueType)
481 case arrow.BOOL:
482 data = transferBool(rdr)
483 case arrow.UINT8,
484 arrow.UINT16,
485 arrow.UINT32,
486 arrow.UINT64,
487 arrow.INT8,
488 arrow.INT16,
489 arrow.DATE32,
490 arrow.TIME32,
491 arrow.TIME64:
492 data = transferInt(rdr, valueType)
493 case arrow.DATE64:
494 data = transferDate64(rdr, valueType)
495 case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING, arrow.LARGE_BINARY, arrow.LARGE_STRING:
496 return transferBinary(rdr, valueType), nil
497 case arrow.DECIMAL, arrow.DECIMAL256:
498 switch descr.PhysicalType() {
499 case parquet.Types.Int32, parquet.Types.Int64:
500 data = transferDecimalInteger(rdr, valueType)
501 case parquet.Types.ByteArray, parquet.Types.FixedLenByteArray:
502 return transferDecimalBytes(rdr.(file.BinaryRecordReader), valueType)
503 default:
504 return nil, errors.New("physical type for decimal128/decimal256 must be int32, int64, bytearray or fixed len byte array")
505 }
506 case arrow.TIMESTAMP:
507 tstype := valueType.(*arrow.TimestampType)
508 switch tstype.Unit {
509 case arrow.Millisecond, arrow.Microsecond:
510 data = transferZeroCopy(rdr, valueType)
511 case arrow.Nanosecond:
512 if descr.PhysicalType() == parquet.Types.Int96 {
513 data = transferInt96(rdr, valueType)
514 } else {
515 data = transferZeroCopy(rdr, valueType)
516 }
517 default:
518 return nil, errors.New("time unit not supported")
519 }
520 case arrow.FLOAT16:
521 if descr.PhysicalType() != parquet.Types.FixedLenByteArray {
522 return nil, errors.New("physical type for float16 must be fixed len byte array")
523 }
524 if len := arrow.Float16SizeBytes; descr.TypeLength() != len {
525 return nil, fmt.Errorf("fixed len byte array length for float16 must be %d", len)
526 }
527 return transferBinary(rdr, valueType), nil
528 default:
529 return nil, fmt.Errorf("no support for reading columns of type: %s", valueType.Name())
530 }
531
532 defer data.Release()
533 arr := array.MakeFromData(data)
534 defer arr.Release()
535 return arrow.NewChunked(valueType, []arrow.Array{arr}), nil
536 }
537
538 func transferZeroCopy(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData {
539 bitmap := rdr.ReleaseValidBits()
540 values := rdr.ReleaseValues()
541 defer func() {
542 if bitmap != nil {
543 bitmap.Release()
544 }
545 if values != nil {
546 values.Release()
547 }
548 }()
549
550 return array.NewData(dt, rdr.ValuesWritten(),
551 []*memory.Buffer{bitmap, values},
552 nil, int(rdr.NullCount()), 0)
553 }
554
555 func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked {
556 brdr := rdr.(file.BinaryRecordReader)
557 if brdr.ReadDictionary() {
558 return transferDictionary(brdr, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: dt})
559 }
560 chunks := brdr.GetBuilderChunks()
561 defer releaseArrays(chunks)
562
563 switch dt := dt.(type) {
564 case arrow.ExtensionType:
565 for idx, chunk := range chunks {
566 chunks[idx] = array.NewExtensionArrayWithStorage(dt, chunk)
567 chunk.Release()
568 }
569 case *arrow.StringType, *arrow.LargeStringType:
570 for idx, chunk := range chunks {
571 chunks[idx] = array.MakeFromData(chunk.Data())
572 chunk.Release()
573 }
574 case *arrow.Float16Type:
575 for idx, chunk := range chunks {
576 data := chunk.Data()
577 f16_data := array.NewData(dt, data.Len(), data.Buffers(), nil, data.NullN(), data.Offset())
578 defer f16_data.Release()
579 chunks[idx] = array.NewFloat16Data(f16_data)
580 chunk.Release()
581 }
582 }
583 return arrow.NewChunked(dt, chunks)
584 }
585
586 func transferInt(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData {
587 var (
588 output reflect.Value
589 )
590
591 signed := true
592
593
594
595 data := make([]byte, rdr.ValuesWritten()*int(bitutil.BytesForBits(int64(dt.(arrow.FixedWidthDataType).BitWidth()))))
596 switch dt.ID() {
597 case arrow.INT8:
598 output = reflect.ValueOf(arrow.Int8Traits.CastFromBytes(data))
599 case arrow.UINT8:
600 signed = false
601 output = reflect.ValueOf(arrow.Uint8Traits.CastFromBytes(data))
602 case arrow.INT16:
603 output = reflect.ValueOf(arrow.Int16Traits.CastFromBytes(data))
604 case arrow.UINT16:
605 signed = false
606 output = reflect.ValueOf(arrow.Uint16Traits.CastFromBytes(data))
607 case arrow.UINT32:
608 signed = false
609 output = reflect.ValueOf(arrow.Uint32Traits.CastFromBytes(data))
610 case arrow.UINT64:
611 signed = false
612 output = reflect.ValueOf(arrow.Uint64Traits.CastFromBytes(data))
613 case arrow.DATE32:
614 output = reflect.ValueOf(arrow.Date32Traits.CastFromBytes(data))
615 case arrow.TIME32:
616 output = reflect.ValueOf(arrow.Time32Traits.CastFromBytes(data))
617 case arrow.TIME64:
618 output = reflect.ValueOf(arrow.Time64Traits.CastFromBytes(data))
619 }
620
621 length := rdr.ValuesWritten()
622
623 switch rdr.Type() {
624 case parquet.Types.Int32:
625 values := arrow.Int32Traits.CastFromBytes(rdr.Values())
626 if signed {
627 for idx, v := range values[:length] {
628 output.Index(idx).SetInt(int64(v))
629 }
630 } else {
631 for idx, v := range values[:length] {
632 output.Index(idx).SetUint(uint64(v))
633 }
634 }
635 case parquet.Types.Int64:
636 values := arrow.Int64Traits.CastFromBytes(rdr.Values())
637 if signed {
638 for idx, v := range values[:length] {
639 output.Index(idx).SetInt(v)
640 }
641 } else {
642 for idx, v := range values[:length] {
643 output.Index(idx).SetUint(uint64(v))
644 }
645 }
646 }
647
648 bitmap := rdr.ReleaseValidBits()
649 if bitmap != nil {
650 defer bitmap.Release()
651 }
652
653 return array.NewData(dt, rdr.ValuesWritten(), []*memory.Buffer{
654 bitmap, memory.NewBufferBytes(data),
655 }, nil, int(rdr.NullCount()), 0)
656 }
657
658 func transferBool(rdr file.RecordReader) arrow.ArrayData {
659
660 length := rdr.ValuesWritten()
661 data := make([]byte, int(bitutil.BytesForBits(int64(length))))
662 bytedata := rdr.Values()
663 values := *(*[]bool)(unsafe.Pointer(&bytedata))
664
665 for idx, v := range values[:length] {
666 if v {
667 bitutil.SetBit(data, idx)
668 }
669 }
670
671 bitmap := rdr.ReleaseValidBits()
672 if bitmap != nil {
673 defer bitmap.Release()
674 }
675 bb := memory.NewBufferBytes(data)
676 defer bb.Release()
677 return array.NewData(&arrow.BooleanType{}, length, []*memory.Buffer{
678 bitmap, bb,
679 }, nil, int(rdr.NullCount()), 0)
680 }
681
682 var milliPerDay = time.Duration(24 * time.Hour).Milliseconds()
683
684
685
686 func transferDate64(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData {
687 length := rdr.ValuesWritten()
688 values := arrow.Int32Traits.CastFromBytes(rdr.Values())
689
690 data := make([]byte, arrow.Int64Traits.BytesRequired(length))
691 out := arrow.Int64Traits.CastFromBytes(data)
692 for idx, val := range values[:length] {
693 out[idx] = int64(val) * milliPerDay
694 }
695
696 bitmap := rdr.ReleaseValidBits()
697 if bitmap != nil {
698 defer bitmap.Release()
699 }
700 return array.NewData(dt, length, []*memory.Buffer{
701 bitmap, memory.NewBufferBytes(data),
702 }, nil, int(rdr.NullCount()), 0)
703 }
704
705
706 func transferInt96(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData {
707 length := rdr.ValuesWritten()
708 values := parquet.Int96Traits.CastFromBytes(rdr.Values())
709
710 data := make([]byte, arrow.Int64SizeBytes*length)
711 out := arrow.Int64Traits.CastFromBytes(data)
712
713 for idx, val := range values[:length] {
714 if binary.LittleEndian.Uint32(val[8:]) == 0 {
715 out[idx] = 0
716 } else {
717 out[idx] = val.ToTime().UnixNano()
718 }
719 }
720
721 bitmap := rdr.ReleaseValidBits()
722 if bitmap != nil {
723 defer bitmap.Release()
724 }
725 return array.NewData(dt, length, []*memory.Buffer{
726 bitmap, memory.NewBufferBytes(data),
727 }, nil, int(rdr.NullCount()), 0)
728 }
729
730
731 func transferDecimalInteger(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData {
732 length := rdr.ValuesWritten()
733
734 var values reflect.Value
735 switch rdr.Type() {
736 case parquet.Types.Int32:
737 values = reflect.ValueOf(arrow.Int32Traits.CastFromBytes(rdr.Values())[:length])
738 case parquet.Types.Int64:
739 values = reflect.ValueOf(arrow.Int64Traits.CastFromBytes(rdr.Values())[:length])
740 }
741
742 var data []byte
743 switch dt.ID() {
744 case arrow.DECIMAL128:
745 data = make([]byte, arrow.Decimal128Traits.BytesRequired(length))
746 out := arrow.Decimal128Traits.CastFromBytes(data)
747 for i := 0; i < values.Len(); i++ {
748 out[i] = decimal128.FromI64(values.Index(i).Int())
749 }
750 case arrow.DECIMAL256:
751 data = make([]byte, arrow.Decimal256Traits.BytesRequired(length))
752 out := arrow.Decimal256Traits.CastFromBytes(data)
753 for i := 0; i < values.Len(); i++ {
754 out[i] = decimal256.FromI64(values.Index(i).Int())
755 }
756 }
757
758 var nullmap *memory.Buffer
759 if rdr.NullCount() > 0 {
760 nullmap = rdr.ReleaseValidBits()
761 defer nullmap.Release()
762 }
763 return array.NewData(dt, length, []*memory.Buffer{
764 nullmap, memory.NewBufferBytes(data),
765 }, nil, int(rdr.NullCount()), 0)
766 }
767
768 func uint64FromBigEndianShifted(buf []byte) uint64 {
769 var (
770 bytes [8]byte
771 )
772 copy(bytes[8-len(buf):], buf)
773 return binary.BigEndian.Uint64(bytes[:])
774 }
775
776
777
778 func bigEndianToDecimal128(buf []byte) (decimal128.Num, error) {
779 const (
780 minDecimalBytes = 1
781 maxDecimalBytes = 16
782 )
783
784 if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes {
785 return decimal128.Num{}, fmt.Errorf("length of byte array passed to bigEndianToDecimal128 was %d but must be between %d and %d",
786 len(buf), minDecimalBytes, maxDecimalBytes)
787 }
788
789
790 isNeg := int8(buf[0]) < 0
791
792
793 highBitsOffset := utils.Max(0, len(buf)-8)
794 var (
795 highBits uint64
796 lowBits uint64
797 hi int64
798 lo int64
799 )
800 highBits = uint64FromBigEndianShifted(buf[:highBitsOffset])
801
802 if highBitsOffset == 8 {
803 hi = int64(highBits)
804 } else {
805 if isNeg && len(buf) < maxDecimalBytes {
806 hi = -1
807 }
808
809 hi = int64(uint64(hi) << (uint64(highBitsOffset) * 8))
810 hi |= int64(highBits)
811 }
812
813
814 lowBitsOffset := utils.Min(len(buf), 8)
815 lowBits = uint64FromBigEndianShifted(buf[highBitsOffset:])
816
817 if lowBitsOffset == 8 {
818 lo = int64(lowBits)
819 } else {
820 if isNeg && len(buf) < 8 {
821 lo = -1
822 }
823
824 lo = int64(uint64(lo) << (uint64(lowBitsOffset) * 8))
825 lo |= int64(lowBits)
826 }
827
828 return decimal128.New(hi, uint64(lo)), nil
829 }
830
831 func bigEndianToDecimal256(buf []byte) (decimal256.Num, error) {
832 const (
833 minDecimalBytes = 1
834 maxDecimalBytes = 32
835 )
836
837 if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes {
838 return decimal256.Num{},
839 fmt.Errorf("%w: length of byte array for bigEndianToDecimal256 was %d but must be between %d and %d",
840 arrow.ErrInvalid, len(buf), minDecimalBytes, maxDecimalBytes)
841 }
842
843 var littleEndian [4]uint64
844
845
846 initWord, isNeg := uint64(0), int8(buf[0]) < 0
847 if isNeg {
848
849 initWord = uint64(0xFFFFFFFFFFFFFFFF)
850 }
851
852 for wordIdx := 0; wordIdx < 4; wordIdx++ {
853 wordLen := utils.Min(len(buf), arrow.Uint64SizeBytes)
854 word := buf[len(buf)-wordLen:]
855
856 if wordLen == 8 {
857
858 littleEndian[wordIdx] = binary.BigEndian.Uint64(word)
859 } else {
860 result := initWord
861 if len(buf) > 0 {
862
863
864 result = result << uint64(wordLen)
865
866 result |= uint64FromBigEndianShifted(word)
867 }
868 littleEndian[wordIdx] = result
869 }
870
871 buf = buf[:len(buf)-wordLen]
872 }
873
874 return decimal256.New(littleEndian[3], littleEndian[2], littleEndian[1], littleEndian[0]), nil
875 }
876
877 type varOrFixedBin interface {
878 arrow.Array
879 Value(i int) []byte
880 }
881
882
883 func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arrow.Chunked, error) {
884 convert128 := func(in varOrFixedBin) (arrow.Array, error) {
885 length := in.Len()
886 data := make([]byte, arrow.Decimal128Traits.BytesRequired(length))
887 out := arrow.Decimal128Traits.CastFromBytes(data)
888
889 nullCount := in.NullN()
890 var err error
891 for i := 0; i < length; i++ {
892 if nullCount > 0 && in.IsNull(i) {
893 continue
894 }
895
896 rec := in.Value(i)
897 if len(rec) <= 0 {
898 return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt)
899 }
900 out[i], err = bigEndianToDecimal128(rec)
901 if err != nil {
902 return nil, err
903 }
904 }
905
906 ret := array.NewData(dt, length, []*memory.Buffer{
907 in.Data().Buffers()[0], memory.NewBufferBytes(data),
908 }, nil, nullCount, 0)
909 defer ret.Release()
910 return array.MakeFromData(ret), nil
911 }
912
913 convert256 := func(in varOrFixedBin) (arrow.Array, error) {
914 length := in.Len()
915 data := make([]byte, arrow.Decimal256Traits.BytesRequired(length))
916 out := arrow.Decimal256Traits.CastFromBytes(data)
917
918 nullCount := in.NullN()
919 var err error
920 for i := 0; i < length; i++ {
921 if nullCount > 0 && in.IsNull(i) {
922 continue
923 }
924
925 rec := in.Value(i)
926 if len(rec) <= 0 {
927 return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt)
928 }
929 out[i], err = bigEndianToDecimal256(rec)
930 if err != nil {
931 return nil, err
932 }
933 }
934
935 ret := array.NewData(dt, length, []*memory.Buffer{
936 in.Data().Buffers()[0], memory.NewBufferBytes(data),
937 }, nil, nullCount, 0)
938 defer ret.Release()
939 return array.MakeFromData(ret), nil
940 }
941
942 convert := func(arr arrow.Array) (arrow.Array, error) {
943 switch dt.ID() {
944 case arrow.DECIMAL128:
945 return convert128(arr.(varOrFixedBin))
946 case arrow.DECIMAL256:
947 return convert256(arr.(varOrFixedBin))
948 }
949 return nil, arrow.ErrNotImplemented
950 }
951
952 chunks := rdr.GetBuilderChunks()
953 var err error
954 for idx, chunk := range chunks {
955 defer chunk.Release()
956 if chunks[idx], err = convert(chunk); err != nil {
957 return nil, err
958 }
959 defer chunks[idx].Release()
960 }
961 return arrow.NewChunked(dt, chunks), nil
962 }
963
964 func transferDictionary(rdr file.RecordReader, logicalValueType arrow.DataType) *arrow.Chunked {
965 brdr := rdr.(file.BinaryRecordReader)
966 chunks := brdr.GetBuilderChunks()
967 defer releaseArrays(chunks)
968 return arrow.NewChunked(logicalValueType, chunks)
969 }
970
View as plain text