1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "fmt"
21 "sync"
22 "sync/atomic"
23 "unsafe"
24
25 "github.com/JohnCGriffin/overflow"
26 "github.com/apache/arrow/go/v15/arrow"
27 "github.com/apache/arrow/go/v15/arrow/array"
28 "github.com/apache/arrow/go/v15/arrow/bitutil"
29 "github.com/apache/arrow/go/v15/arrow/memory"
30 "github.com/apache/arrow/go/v15/internal/utils"
31 "github.com/apache/arrow/go/v15/parquet"
32 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
33 "github.com/apache/arrow/go/v15/parquet/schema"
34 "golang.org/x/xerrors"
35 )
36
37
38
39
40 type RecordReader interface {
41
42 DefLevels() []int16
43
44
45 LevelsPos() int64
46
47 RepLevels() []int16
48
49
50 Reset()
51
52 Reserve(int64) error
53
54
55 HasMore() bool
56
57
58 ReadRecords(num int64) (int64, error)
59
60 ValuesWritten() int
61
62
63 ReleaseValidBits() *memory.Buffer
64
65
66 ReleaseValues() *memory.Buffer
67
68 NullCount() int64
69
70 Type() parquet.Type
71
72
73 Values() []byte
74
75
76 SetPageReader(PageReader)
77
78 Retain()
79
80
81 Release()
82 }
83
84
85
86 type BinaryRecordReader interface {
87 RecordReader
88 GetBuilderChunks() []arrow.Array
89 ReadDictionary() bool
90 }
91
92
93
94 type recordReaderImpl interface {
95 ColumnChunkReader
96 ReadValuesDense(int64) error
97 ReadValuesSpaced(int64, int64) error
98 ReserveValues(int64, bool) error
99 ResetValues()
100 GetValidBits() []byte
101 IncrementWritten(int64, int64)
102 ValuesWritten() int64
103 ReleaseValidBits() *memory.Buffer
104 ReleaseValues() *memory.Buffer
105 NullCount() int64
106 Values() []byte
107 SetPageReader(PageReader)
108 Retain()
109 Release()
110 }
111
112 type binaryRecordReaderImpl interface {
113 recordReaderImpl
114 GetBuilderChunks() []arrow.Array
115 ReadDictionary() bool
116 }
117
118
119 type primitiveRecordReader struct {
120 ColumnChunkReader
121
122 valuesWritten int64
123 valuesCap int64
124 nullCount int64
125 values *memory.Buffer
126 validBits *memory.Buffer
127 mem memory.Allocator
128
129 refCount int64
130 useValues bool
131 }
132
133 func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator, bufferPool *sync.Pool) primitiveRecordReader {
134 return primitiveRecordReader{
135 ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool),
136 values: memory.NewResizableBuffer(mem),
137 validBits: memory.NewResizableBuffer(mem),
138 mem: mem,
139 refCount: 1,
140 useValues: descr.PhysicalType() != parquet.Types.ByteArray && descr.PhysicalType() != parquet.Types.FixedLenByteArray,
141 }
142 }
143
144 func (pr *primitiveRecordReader) Retain() {
145 atomic.AddInt64(&pr.refCount, 1)
146 }
147
148 func (pr *primitiveRecordReader) Release() {
149 if atomic.AddInt64(&pr.refCount, -1) == 0 {
150 if pr.values != nil {
151 pr.values.Release()
152 pr.values = nil
153 }
154 if pr.validBits != nil {
155 pr.validBits.Release()
156 pr.validBits = nil
157 }
158 }
159 }
160
161 func (pr *primitiveRecordReader) SetPageReader(rdr PageReader) {
162 pr.ColumnChunkReader.setPageReader(rdr)
163 }
164
165 func (pr *primitiveRecordReader) ReleaseValidBits() *memory.Buffer {
166 res := pr.validBits
167 res.Resize(int(bitutil.BytesForBits(pr.valuesWritten)))
168 pr.validBits = memory.NewResizableBuffer(pr.mem)
169 return res
170 }
171
172 func (pr *primitiveRecordReader) ReleaseValues() (res *memory.Buffer) {
173 res = pr.values
174 nbytes, err := pr.numBytesForValues(pr.valuesWritten)
175 if err != nil {
176 panic(err)
177 }
178 res.Resize(int(nbytes))
179 pr.values = memory.NewResizableBuffer(pr.mem)
180 pr.valuesCap = 0
181
182 return
183 }
184
185 func (pr *primitiveRecordReader) NullCount() int64 { return pr.nullCount }
186
187 func (pr *primitiveRecordReader) IncrementWritten(w, n int64) {
188 pr.valuesWritten += w
189 pr.nullCount += n
190 }
191 func (pr *primitiveRecordReader) GetValidBits() []byte { return pr.validBits.Bytes() }
192 func (pr *primitiveRecordReader) ValuesWritten() int64 { return pr.valuesWritten }
193 func (pr *primitiveRecordReader) Values() []byte { return pr.values.Bytes() }
194 func (pr *primitiveRecordReader) ResetValues() {
195 if pr.valuesWritten > 0 {
196 pr.values.ResizeNoShrink(0)
197 pr.validBits.ResizeNoShrink(0)
198 pr.valuesWritten = 0
199 pr.valuesCap = 0
200 pr.nullCount = 0
201 }
202 }
203
204 func (pr *primitiveRecordReader) numBytesForValues(nitems int64) (num int64, err error) {
205 typeSize := int64(pr.Descriptor().PhysicalType().ByteSize())
206 var ok bool
207 if num, ok = overflow.Mul64(nitems, typeSize); !ok {
208 err = xerrors.New("total size of items too large")
209 }
210 return
211 }
212
213 func (pr *primitiveRecordReader) ReserveValues(extra int64, hasNullable bool) error {
214 newCap, err := updateCapacity(pr.valuesCap, pr.valuesWritten, extra)
215 if err != nil {
216 return err
217 }
218 if newCap > pr.valuesCap {
219 capBytes, err := pr.numBytesForValues(newCap)
220 if err != nil {
221 return err
222 }
223 if pr.useValues {
224 pr.values.ResizeNoShrink(int(capBytes))
225 }
226 pr.valuesCap = newCap
227 }
228 if hasNullable {
229 validBytesCap := bitutil.BytesForBits(pr.valuesCap)
230 if pr.validBits.Len() < int(validBytesCap) {
231 pr.validBits.ResizeNoShrink(int(validBytesCap))
232 }
233 }
234 return nil
235 }
236
237 func (pr *primitiveRecordReader) ReadValuesDense(toRead int64) (err error) {
238 switch cr := pr.ColumnChunkReader.(type) {
239 case *BooleanColumnChunkReader:
240 data := pr.values.Bytes()[int(pr.valuesWritten):]
241 values := *(*[]bool)(unsafe.Pointer(&data))
242 _, err = cr.curDecoder.(encoding.BooleanDecoder).Decode(values[:toRead])
243 case *Int32ColumnChunkReader:
244 values := arrow.Int32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
245 _, err = cr.curDecoder.(encoding.Int32Decoder).Decode(values[:toRead])
246 case *Int64ColumnChunkReader:
247 values := arrow.Int64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
248 _, err = cr.curDecoder.(encoding.Int64Decoder).Decode(values[:toRead])
249 case *Int96ColumnChunkReader:
250 values := parquet.Int96Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
251 _, err = cr.curDecoder.(encoding.Int96Decoder).Decode(values[:toRead])
252 case *ByteArrayColumnChunkReader:
253 values := parquet.ByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
254 _, err = cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[:toRead])
255 case *FixedLenByteArrayColumnChunkReader:
256 values := parquet.FixedLenByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
257 _, err = cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[:toRead])
258 case *Float32ColumnChunkReader:
259 values := arrow.Float32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
260 _, err = cr.curDecoder.(encoding.Float32Decoder).Decode(values[:toRead])
261 case *Float64ColumnChunkReader:
262 values := arrow.Float64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
263 _, err = cr.curDecoder.(encoding.Float64Decoder).Decode(values[:toRead])
264 default:
265 panic("invalid type for record reader")
266 }
267 return
268 }
269
270 func (pr *primitiveRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) (err error) {
271 validBits := pr.validBits.Bytes()
272 offset := pr.valuesWritten
273
274 switch cr := pr.ColumnChunkReader.(type) {
275 case *BooleanColumnChunkReader:
276 data := pr.values.Bytes()[int(pr.valuesWritten):]
277 values := *(*[]bool)(unsafe.Pointer(&data))
278 _, err = cr.curDecoder.(encoding.BooleanDecoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
279 case *Int32ColumnChunkReader:
280 values := arrow.Int32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
281 _, err = cr.curDecoder.(encoding.Int32Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
282 case *Int64ColumnChunkReader:
283 values := arrow.Int64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
284 _, err = cr.curDecoder.(encoding.Int64Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
285 case *Int96ColumnChunkReader:
286 values := parquet.Int96Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
287 _, err = cr.curDecoder.(encoding.Int96Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
288 case *ByteArrayColumnChunkReader:
289 values := parquet.ByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
290 _, err = cr.curDecoder.(encoding.ByteArrayDecoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
291 case *FixedLenByteArrayColumnChunkReader:
292 values := parquet.FixedLenByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
293 _, err = cr.curDecoder.(encoding.FixedLenByteArrayDecoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
294 case *Float32ColumnChunkReader:
295 values := arrow.Float32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
296 _, err = cr.curDecoder.(encoding.Float32Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
297 case *Float64ColumnChunkReader:
298 values := arrow.Float64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
299 _, err = cr.curDecoder.(encoding.Float64Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset)
300 default:
301 panic("invalid type for record reader")
302 }
303 return
304 }
305
306 type recordReader struct {
307 recordReaderImpl
308 leafInfo LevelInfo
309
310 atRecStart bool
311 recordsRead int64
312
313 levelsWritten int64
314 levelsPos int64
315 levelsCap int64
316
317 defLevels *memory.Buffer
318 repLevels *memory.Buffer
319
320 refCount int64
321 }
322
323
324 type binaryRecordReader struct {
325 *recordReader
326 }
327
328 func (b *binaryRecordReader) ReadDictionary() bool {
329 return b.recordReaderImpl.(binaryRecordReaderImpl).ReadDictionary()
330 }
331
332 func (b *binaryRecordReader) GetBuilderChunks() []arrow.Array {
333 return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks()
334 }
335
336 func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
337 if mem == nil {
338 mem = memory.DefaultAllocator
339 }
340
341 pr := createPrimitiveRecordReader(descr, mem, bufferPool)
342 return &recordReader{
343 refCount: 1,
344 recordReaderImpl: &pr,
345 leafInfo: info,
346 defLevels: memory.NewResizableBuffer(mem),
347 repLevels: memory.NewResizableBuffer(mem),
348 }
349 }
350
351 func (rr *recordReader) Retain() {
352 atomic.AddInt64(&rr.refCount, 1)
353 }
354
355 func (rr *recordReader) Release() {
356 if atomic.AddInt64(&rr.refCount, -1) == 0 {
357 rr.recordReaderImpl.Release()
358 rr.defLevels.Release()
359 rr.repLevels.Release()
360 rr.defLevels, rr.repLevels = nil, nil
361 }
362 }
363
364 func (rr *recordReader) DefLevels() []int16 {
365 return arrow.Int16Traits.CastFromBytes(rr.defLevels.Bytes())
366 }
367
368 func (rr *recordReader) RepLevels() []int16 {
369 return arrow.Int16Traits.CastFromBytes(rr.repLevels.Bytes())
370 }
371
372 func (rr *recordReader) HasMore() bool {
373 return rr.pager() != nil
374 }
375
376 func (rr *recordReader) SetPageReader(pr PageReader) {
377 rr.atRecStart = true
378 rr.recordReaderImpl.SetPageReader(pr)
379 }
380
381 func (rr *recordReader) ValuesWritten() int {
382 return int(rr.recordReaderImpl.ValuesWritten())
383 }
384
385 func (rr *recordReader) LevelsPos() int64 { return rr.levelsPos }
386
387 func updateCapacity(cap, size, extra int64) (int64, error) {
388 if extra < 0 {
389 return 0, xerrors.New("negative size (corrupt file?)")
390 }
391 target, ok := overflow.Add64(size, extra)
392 if !ok {
393 return 0, xerrors.New("allocation size too large (corrupt file?)")
394 }
395 if target >= (1 << 62) {
396 return 0, xerrors.New("allocation size too large (corrupt file?)")
397 }
398 if cap >= target {
399 return cap, nil
400 }
401 return int64(bitutil.NextPowerOf2(int(target))), nil
402 }
403
404 func (rr *recordReader) Reserve(cap int64) error {
405 if err := rr.reserveLevels(cap); err != nil {
406 return err
407 }
408 if err := rr.reserveValues(cap); err != nil {
409 return err
410 }
411 return nil
412 }
413
414 func (rr *recordReader) reserveLevels(extra int64) error {
415 if rr.Descriptor().MaxDefinitionLevel() > 0 {
416 newCap, err := updateCapacity(rr.levelsCap, rr.levelsWritten, extra)
417 if err != nil {
418 return err
419 }
420
421 if newCap > rr.levelsCap {
422 capBytes, ok := overflow.Mul(int(newCap), arrow.Int16SizeBytes)
423 if !ok {
424 return fmt.Errorf("allocation size too large (corrupt file?)")
425 }
426 rr.defLevels.ResizeNoShrink(capBytes)
427 if rr.Descriptor().MaxRepetitionLevel() > 0 {
428 rr.repLevels.ResizeNoShrink(capBytes)
429 }
430 rr.levelsCap = newCap
431 }
432 }
433 return nil
434 }
435
436 func (rr *recordReader) reserveValues(extra int64) error {
437 return rr.recordReaderImpl.ReserveValues(extra, rr.leafInfo.HasNullableValues())
438 }
439
440 func (rr *recordReader) resetValues() {
441 rr.recordReaderImpl.ResetValues()
442 }
443
444 func (rr *recordReader) Reset() {
445 rr.resetValues()
446
447 if rr.levelsWritten > 0 {
448 remain := int(rr.levelsWritten - rr.levelsPos)
449
450
451 defData := rr.DefLevels()
452
453 copy(defData, defData[int(rr.levelsPos):int(rr.levelsWritten)])
454 rr.defLevels.ResizeNoShrink(remain * int(arrow.Int16SizeBytes))
455
456 if rr.Descriptor().MaxRepetitionLevel() > 0 {
457 repData := rr.RepLevels()
458 copy(repData, repData[int(rr.levelsPos):int(rr.levelsWritten)])
459 rr.repLevels.ResizeNoShrink(remain * int(arrow.Int16SizeBytes))
460 }
461
462 rr.levelsWritten -= rr.levelsPos
463 rr.levelsPos = 0
464 rr.levelsCap = int64(remain)
465 }
466
467 rr.recordsRead = 0
468 }
469
470
471
472
473
474 func (rr *recordReader) delimitRecords(numRecords int64) (recordsRead, valsToRead int64) {
475 var (
476 curRep int16
477 curDef int16
478 )
479
480 defLevels := rr.DefLevels()[int(rr.levelsPos):]
481 repLevels := rr.RepLevels()[int(rr.levelsPos):]
482
483 for rr.levelsPos < rr.levelsWritten {
484 curRep, repLevels = repLevels[0], repLevels[1:]
485 if curRep == 0 {
486
487
488
489
490 if !rr.atRecStart {
491
492 recordsRead++
493 if recordsRead == numRecords {
494
495 rr.atRecStart = true
496 break
497 }
498 }
499 }
500
501
502 rr.atRecStart = false
503
504 curDef, defLevels = defLevels[0], defLevels[1:]
505 if curDef == rr.Descriptor().MaxDefinitionLevel() {
506 valsToRead++
507 }
508 rr.levelsPos++
509 }
510 return
511 }
512
513 func (rr *recordReader) ReadRecordData(numRecords int64) (int64, error) {
514 possibleNum := utils.Max(numRecords, rr.levelsWritten-rr.levelsPos)
515 if err := rr.reserveValues(possibleNum); err != nil {
516 return 0, err
517 }
518
519 var (
520 startPos = rr.levelsPos
521 valuesToRead int64
522 recordsRead int64
523 nullCount int64
524 err error
525 )
526
527 if rr.Descriptor().MaxRepetitionLevel() > 0 {
528 recordsRead, valuesToRead = rr.delimitRecords(numRecords)
529 } else if rr.Descriptor().MaxDefinitionLevel() > 0 {
530
531
532 recordsRead = utils.Min(rr.levelsWritten-rr.levelsPos, numRecords)
533
534 rr.levelsPos += recordsRead
535 } else {
536 recordsRead, valuesToRead = numRecords, numRecords
537 }
538
539 if rr.leafInfo.HasNullableValues() {
540 validityIO := ValidityBitmapInputOutput{
541 ReadUpperBound: rr.levelsPos - startPos,
542 ValidBits: rr.GetValidBits(),
543 ValidBitsOffset: rr.recordReaderImpl.ValuesWritten(),
544 }
545 DefLevelsToBitmap(rr.DefLevels()[startPos:int(rr.levelsPos)], rr.leafInfo, &validityIO)
546 valuesToRead = validityIO.Read - validityIO.NullCount
547 nullCount = validityIO.NullCount
548 err = rr.ReadValuesSpaced(validityIO.Read, nullCount)
549 } else {
550 err = rr.ReadValuesDense(valuesToRead)
551 }
552 if err != nil {
553 return 0, err
554 }
555
556 if rr.leafInfo.DefLevel > 0 {
557 rr.consumeBufferedValues(rr.levelsPos - startPos)
558 } else {
559 rr.consumeBufferedValues(valuesToRead)
560 }
561
562
563 rr.IncrementWritten(valuesToRead+nullCount, nullCount)
564 return recordsRead, nil
565 }
566
567 const minLevelBatchSize = 1024
568
569 func (rr *recordReader) ReadRecords(numRecords int64) (int64, error) {
570
571 recordsRead := int64(0)
572
573 if rr.levelsPos < rr.levelsWritten {
574 additional, err := rr.ReadRecordData(numRecords)
575 if err != nil {
576 return 0, err
577 }
578 recordsRead += additional
579 }
580
581 levelBatch := utils.Max(minLevelBatchSize, numRecords)
582
583
584
585
586 for !rr.atRecStart || recordsRead < numRecords {
587
588 if !rr.HasNext() {
589 if !rr.atRecStart {
590
591
592
593 recordsRead++
594 rr.atRecStart = true
595 }
596 break
597 }
598
599
600
601 batchSize := utils.Min(levelBatch, rr.numAvailValues())
602 if batchSize == 0 {
603
604 break
605 }
606
607 if rr.Descriptor().MaxDefinitionLevel() > 0 {
608 if err := rr.reserveLevels(batchSize); err != nil {
609 return 0, err
610 }
611
612 defLevels := rr.DefLevels()[int(rr.levelsWritten):]
613
614 levelsRead := 0
615
616 if rr.Descriptor().MaxRepetitionLevel() > 0 {
617 repLevels := rr.RepLevels()[int(rr.levelsWritten):]
618 levelsRead, _ = rr.readDefinitionLevels(defLevels[:batchSize])
619 if rr.readRepetitionLevels(repLevels[:batchSize]) != levelsRead {
620 return 0, xerrors.New("number of decoded rep/def levels did not match")
621 }
622 } else if rr.Descriptor().MaxDefinitionLevel() > 0 {
623 levelsRead, _ = rr.readDefinitionLevels(defLevels[:batchSize])
624 }
625
626 if levelsRead == 0 {
627
628 break
629 }
630
631 rr.levelsWritten += int64(levelsRead)
632 read, err := rr.ReadRecordData(numRecords - recordsRead)
633 if err != nil {
634 return recordsRead, err
635 }
636 recordsRead += read
637 } else {
638
639 batchSize = utils.Min(numRecords-recordsRead, batchSize)
640 read, err := rr.ReadRecordData(batchSize)
641 if err != nil {
642 return recordsRead, err
643 }
644 recordsRead += read
645 }
646 }
647
648 return recordsRead, nil
649 }
650
651 func (rr *recordReader) ReleaseValidBits() *memory.Buffer {
652 if rr.leafInfo.HasNullableValues() {
653 return rr.recordReaderImpl.ReleaseValidBits()
654 }
655 return nil
656 }
657
658
659
660 type flbaRecordReader struct {
661 primitiveRecordReader
662
663 bldr *array.FixedSizeBinaryBuilder
664 valueBuf []parquet.FixedLenByteArray
665 }
666
667 func (fr *flbaRecordReader) ReserveValues(extra int64, hasNullable bool) error {
668 fr.bldr.Reserve(int(extra))
669 return fr.primitiveRecordReader.ReserveValues(extra, hasNullable)
670 }
671
672 func (fr *flbaRecordReader) Retain() {
673 fr.bldr.Retain()
674 fr.primitiveRecordReader.Retain()
675 }
676
677 func (fr *flbaRecordReader) Release() {
678 fr.bldr.Release()
679 fr.primitiveRecordReader.Release()
680 }
681
682 func (fr *flbaRecordReader) ReadValuesDense(toRead int64) error {
683 if int64(cap(fr.valueBuf)) < toRead {
684 fr.valueBuf = make([]parquet.FixedLenByteArray, 0, toRead)
685 }
686
687 values := fr.valueBuf[:toRead]
688 dec := fr.ColumnChunkReader.(*FixedLenByteArrayColumnChunkReader).curDecoder.(encoding.FixedLenByteArrayDecoder)
689
690 _, err := dec.Decode(values)
691 if err != nil {
692 return err
693 }
694
695 for _, val := range values {
696 fr.bldr.Append(val)
697 }
698 fr.ResetValues()
699 return nil
700 }
701
702 func (fr *flbaRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) error {
703 validBits := fr.validBits.Bytes()
704 offset := fr.valuesWritten
705
706 if int64(cap(fr.valueBuf)) < valuesWithNulls {
707 fr.valueBuf = make([]parquet.FixedLenByteArray, 0, valuesWithNulls)
708 }
709
710 values := fr.valueBuf[:valuesWithNulls]
711 dec := fr.ColumnChunkReader.(*FixedLenByteArrayColumnChunkReader).curDecoder.(encoding.FixedLenByteArrayDecoder)
712 _, err := dec.DecodeSpaced(values, int(nullCount), validBits, offset)
713 if err != nil {
714 return err
715 }
716
717 for idx, val := range values {
718 if bitutil.BitIsSet(validBits, int(offset)+idx) {
719 fr.bldr.Append(val)
720 } else {
721 fr.bldr.AppendNull()
722 }
723 }
724 fr.ResetValues()
725 return nil
726 }
727
728 func (fr *flbaRecordReader) GetBuilderChunks() []arrow.Array {
729 return []arrow.Array{fr.bldr.NewArray()}
730 }
731
732 func (fr *flbaRecordReader) ReadDictionary() bool { return false }
733
734 func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
735 if mem == nil {
736 mem = memory.DefaultAllocator
737 }
738
739 byteWidth := descr.TypeLength()
740
741 return &binaryRecordReader{&recordReader{
742 recordReaderImpl: &flbaRecordReader{
743 createPrimitiveRecordReader(descr, mem, bufferPool),
744 array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth}),
745 nil,
746 },
747 leafInfo: info,
748 defLevels: memory.NewResizableBuffer(mem),
749 repLevels: memory.NewResizableBuffer(mem),
750 refCount: 1,
751 }}
752 }
753
754
755 type byteArrayRecordReader struct {
756 primitiveRecordReader
757
758 bldr array.Builder
759 valueBuf []parquet.ByteArray
760 }
761
762 func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, dtype arrow.DataType, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
763 if mem == nil {
764 mem = memory.DefaultAllocator
765 }
766
767 dt, ok := dtype.(arrow.BinaryDataType)
768
769 if !ok {
770 dt = arrow.BinaryTypes.Binary
771 }
772
773 return &binaryRecordReader{&recordReader{
774 recordReaderImpl: &byteArrayRecordReader{
775 createPrimitiveRecordReader(descr, mem, bufferPool),
776 array.NewBinaryBuilder(mem, dt),
777 nil,
778 },
779 leafInfo: info,
780 defLevels: memory.NewResizableBuffer(mem),
781 repLevels: memory.NewResizableBuffer(mem),
782 refCount: 1,
783 }}
784 }
785
786 func (br *byteArrayRecordReader) ReserveValues(extra int64, hasNullable bool) error {
787 br.bldr.Reserve(int(extra))
788 return br.primitiveRecordReader.ReserveValues(extra, hasNullable)
789 }
790
791 func (br *byteArrayRecordReader) Retain() {
792 br.bldr.Retain()
793 br.primitiveRecordReader.Retain()
794 }
795
796 func (br *byteArrayRecordReader) Release() {
797 br.bldr.Release()
798 br.primitiveRecordReader.Release()
799 }
800
801 func (br *byteArrayRecordReader) ReadValuesDense(toRead int64) error {
802 if int64(cap(br.valueBuf)) < toRead {
803 br.valueBuf = make([]parquet.ByteArray, 0, toRead)
804 }
805
806 values := br.valueBuf[:toRead]
807 dec := br.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder)
808
809 _, err := dec.Decode(values)
810 if err != nil {
811 return err
812 }
813
814 switch bldr := br.bldr.(type) {
815 case *array.BinaryBuilder:
816 for _, val := range values {
817 bldr.Append(val)
818 }
819 case *array.BinaryDictionaryBuilder:
820 for _, val := range values {
821 if err := bldr.Append(val); err != nil {
822 return err
823 }
824 }
825 }
826
827 br.ResetValues()
828 return nil
829 }
830
831 func (br *byteArrayRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) error {
832 validBits := br.validBits.Bytes()
833 offset := br.valuesWritten
834
835 if int64(cap(br.valueBuf)) < valuesWithNulls {
836 br.valueBuf = make([]parquet.ByteArray, 0, valuesWithNulls)
837 }
838
839 values := br.valueBuf[:valuesWithNulls]
840 dec := br.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder)
841 _, err := dec.DecodeSpaced(values, int(nullCount), validBits, offset)
842 if err != nil {
843 return err
844 }
845
846 switch bldr := br.bldr.(type) {
847 case *array.BinaryBuilder:
848 for idx, val := range values {
849 if bitutil.BitIsSet(validBits, int(offset)+idx) {
850 bldr.Append(val)
851 } else {
852 bldr.AppendNull()
853 }
854 }
855 case *array.BinaryDictionaryBuilder:
856 for idx, val := range values {
857 if bitutil.BitIsSet(validBits, int(offset)+idx) {
858 if err := bldr.Append(val); err != nil {
859 return err
860 }
861 } else {
862 bldr.AppendNull()
863 }
864 }
865 }
866
867 br.ResetValues()
868 return nil
869 }
870
871 func (br *byteArrayRecordReader) GetBuilderChunks() []arrow.Array {
872 return []arrow.Array{br.bldr.NewArray()}
873 }
874
875 func (br *byteArrayRecordReader) ReadDictionary() bool { return false }
876
877 type byteArrayDictRecordReader struct {
878 byteArrayRecordReader
879
880 resultChunks []arrow.Array
881 }
882
883 func newByteArrayDictRecordReader(descr *schema.Column, info LevelInfo, dtype arrow.DataType, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
884 if mem == nil {
885 mem = memory.DefaultAllocator
886 }
887
888 dt := dtype.(*arrow.DictionaryType)
889 if _, ok := dt.ValueType.(arrow.BinaryDataType); !ok {
890 dt.ValueType = arrow.BinaryTypes.Binary
891 }
892
893 return &binaryRecordReader{&recordReader{
894 recordReaderImpl: &byteArrayDictRecordReader{
895 byteArrayRecordReader: byteArrayRecordReader{
896 createPrimitiveRecordReader(descr, mem, bufferPool),
897 array.NewDictionaryBuilder(mem, dt),
898 nil,
899 },
900 resultChunks: make([]arrow.Array, 0),
901 },
902 leafInfo: info,
903 defLevels: memory.NewResizableBuffer(mem),
904 repLevels: memory.NewResizableBuffer(mem),
905 refCount: 1,
906 }}
907 }
908
909 func (bd *byteArrayDictRecordReader) GetBuilderChunks() []arrow.Array {
910 bd.flushBuilder()
911 chunks := bd.resultChunks
912 bd.resultChunks = make([]arrow.Array, 0, 1)
913 return chunks
914 }
915
916 func (bd *byteArrayDictRecordReader) flushBuilder() {
917 if bd.bldr.Len() > 0 {
918 chunk := bd.bldr.NewArray()
919 bd.resultChunks = append(bd.resultChunks, chunk)
920 }
921 }
922
923 func (bd *byteArrayDictRecordReader) maybeWriteNewDictionary() error {
924 rdr := bd.ColumnChunkReader.(*ByteArrayColumnChunkReader)
925 if rdr.newDictionary {
926
927
928 bd.flushBuilder()
929 bd.bldr.(*array.BinaryDictionaryBuilder).ResetFull()
930 dec := rdr.curDecoder.(*encoding.DictByteArrayDecoder)
931 if err := dec.InsertDictionary(bd.bldr); err != nil {
932 return err
933 }
934 rdr.newDictionary = false
935 }
936 return nil
937 }
938
939 func (bd *byteArrayDictRecordReader) ReadValuesDense(toRead int64) error {
940 dec := bd.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder)
941 if dec.Encoding() == parquet.Encodings.RLEDict {
942 if err := bd.maybeWriteNewDictionary(); err != nil {
943 return err
944 }
945
946 rdr := bd.ColumnChunkReader.(*ByteArrayColumnChunkReader)
947 _, err := rdr.curDecoder.(*encoding.DictByteArrayDecoder).DecodeIndices(int(toRead), bd.bldr)
948 return err
949 }
950 return bd.byteArrayRecordReader.ReadValuesDense(toRead)
951 }
952
953 func (bd *byteArrayDictRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) error {
954 validBits := bd.validBits.Bytes()
955 offset := bd.valuesWritten
956
957 dec := bd.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder)
958 if dec.Encoding() == parquet.Encodings.RLEDict {
959 if err := bd.maybeWriteNewDictionary(); err != nil {
960 return err
961 }
962
963 rdr := bd.ColumnChunkReader.(*ByteArrayColumnChunkReader)
964 _, err := rdr.curDecoder.(*encoding.DictByteArrayDecoder).DecodeIndicesSpaced(int(valuesWithNulls), int(nullCount), validBits, offset, bd.bldr)
965 return err
966
967 }
968
969 return bd.byteArrayRecordReader.ReadValuesSpaced(valuesWithNulls, int64(nullCount))
970 }
971
972 func (bd *byteArrayDictRecordReader) ReadDictionary() bool { return true }
973
974 func NewRecordReader(descr *schema.Column, info LevelInfo, dtype arrow.DataType, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
975 switch descr.PhysicalType() {
976 case parquet.Types.ByteArray:
977 if dtype.ID() == arrow.DICTIONARY {
978 return newByteArrayDictRecordReader(descr, info, dtype, mem, bufferPool)
979 }
980 return newByteArrayRecordReader(descr, info, dtype, mem, bufferPool)
981 case parquet.Types.FixedLenByteArray:
982 return newFLBARecordReader(descr, info, mem, bufferPool)
983 default:
984 return newRecordReader(descr, info, mem, bufferPool)
985 }
986 }
987
View as plain text