1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "errors"
21 "fmt"
22 "sync"
23
24 "github.com/apache/arrow/go/v15/arrow/memory"
25 "github.com/apache/arrow/go/v15/internal/utils"
26 "github.com/apache/arrow/go/v15/parquet"
27 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
28 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
29 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
30 "github.com/apache/arrow/go/v15/parquet/schema"
31 "golang.org/x/xerrors"
32 )
33
34 const (
35
36 defaultMaxPageHeaderSize = 4 * 1024 * 1024
37
38 defaultPageHeaderSize = 16 * 1024
39 )
40
41
42
43 func isDictIndexEncoding(e format.Encoding) bool {
44 return e == format.Encoding_RLE_DICTIONARY || e == format.Encoding_PLAIN_DICTIONARY
45 }
46
47
48
49
50 type CryptoContext struct {
51 StartDecryptWithDictionaryPage bool
52 RowGroupOrdinal int16
53 ColumnOrdinal int16
54 MetaDecryptor encryption.Decryptor
55 DataDecryptor encryption.Decryptor
56 }
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 type ColumnChunkReader interface {
79
80
81 HasNext() bool
82
83 Type() parquet.Type
84
85 Descriptor() *schema.Column
86
87
88
89 Err() error
90
91 consumeBufferedValues(int64)
92
93
94 numAvailValues() int64
95
96
97
98 readDefinitionLevels(levels []int16) (int, int64)
99
100
101 readRepetitionLevels(levels []int16) int
102
103
104
105
106 pager() PageReader
107
108
109
110
111
112 setPageReader(PageReader)
113 }
114
115 type columnChunkReader struct {
116 descr *schema.Column
117 rdr PageReader
118 repetitionDecoder encoding.LevelDecoder
119 definitionDecoder encoding.LevelDecoder
120
121 curPage Page
122 curEncoding format.Encoding
123 curDecoder encoding.TypedDecoder
124
125
126 numBuffered int64
127
128 numDecoded int64
129 mem memory.Allocator
130 bufferPool *sync.Pool
131
132 decoders map[format.Encoding]encoding.TypedDecoder
133 decoderTraits encoding.DecoderTraits
134
135
136 err error
137 defLvlBuffer []int16
138
139 newDictionary bool
140 }
141
142
143
144
145
146
147
148 func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
149 base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: bufferPool}
150 switch descr.PhysicalType() {
151 case parquet.Types.FixedLenByteArray:
152 base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
153 return &FixedLenByteArrayColumnChunkReader{base}
154 case parquet.Types.Float:
155 base.decoderTraits = &encoding.Float32DecoderTraits
156 return &Float32ColumnChunkReader{base}
157 case parquet.Types.Double:
158 base.decoderTraits = &encoding.Float64DecoderTraits
159 return &Float64ColumnChunkReader{base}
160 case parquet.Types.ByteArray:
161 base.decoderTraits = &encoding.ByteArrayDecoderTraits
162 return &ByteArrayColumnChunkReader{base}
163 case parquet.Types.Int32:
164 base.decoderTraits = &encoding.Int32DecoderTraits
165 return &Int32ColumnChunkReader{base}
166 case parquet.Types.Int64:
167 base.decoderTraits = &encoding.Int64DecoderTraits
168 return &Int64ColumnChunkReader{base}
169 case parquet.Types.Int96:
170 base.decoderTraits = &encoding.Int96DecoderTraits
171 return &Int96ColumnChunkReader{base}
172 case parquet.Types.Boolean:
173 base.decoderTraits = &encoding.BooleanDecoderTraits
174 return &BooleanColumnChunkReader{base}
175 }
176 return nil
177 }
178
179 func (c *columnChunkReader) Err() error { return c.err }
180 func (c *columnChunkReader) Type() parquet.Type { return c.descr.PhysicalType() }
181 func (c *columnChunkReader) Descriptor() *schema.Column { return c.descr }
182 func (c *columnChunkReader) consumeBufferedValues(n int64) { c.numDecoded += n }
183 func (c *columnChunkReader) numAvailValues() int64 { return c.numBuffered - c.numDecoded }
184 func (c *columnChunkReader) pager() PageReader { return c.rdr }
185 func (c *columnChunkReader) setPageReader(rdr PageReader) {
186 c.rdr, c.err = rdr, nil
187 c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
188 c.numBuffered, c.numDecoded = 0, 0
189 }
190
191 func (c *columnChunkReader) getDefLvlBuffer(sz int64) []int16 {
192 if int64(len(c.defLvlBuffer)) < sz {
193 c.defLvlBuffer = make([]int16, sz)
194 return c.defLvlBuffer
195 }
196
197 return c.defLvlBuffer[:sz]
198 }
199
200
201
202 func (c *columnChunkReader) HasNext() bool {
203 if c.numBuffered == 0 || c.numDecoded == c.numBuffered {
204 return c.readNewPage() && c.numBuffered != 0
205 }
206 return true
207 }
208
209 func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
210 enc := page.encoding
211 if enc == format.Encoding_PLAIN_DICTIONARY || enc == format.Encoding_PLAIN {
212 enc = format.Encoding_RLE_DICTIONARY
213 }
214
215 if _, ok := c.decoders[enc]; ok {
216 return xerrors.New("parquet: column chunk cannot have more than one dictionary.")
217 }
218
219 switch page.Encoding() {
220 case format.Encoding_PLAIN, format.Encoding_PLAIN_DICTIONARY:
221 dict := c.decoderTraits.Decoder(parquet.Encodings.Plain, c.descr, false, c.mem)
222 dict.SetData(int(page.NumValues()), page.Data())
223
224 decoder := c.decoderTraits.Decoder(parquet.Encodings.Plain, c.descr, true, c.mem).(encoding.DictDecoder)
225 decoder.SetDict(dict)
226 c.decoders[enc] = decoder
227 default:
228 return xerrors.New("parquet: dictionary index must be plain encoding")
229 }
230
231 c.newDictionary = true
232 c.curDecoder = c.decoders[enc]
233 return nil
234 }
235
236
237 func (c *columnChunkReader) readNewPage() bool {
238 for c.rdr.Next() {
239 c.curPage = c.rdr.Page()
240 if c.curPage == nil {
241 break
242 }
243
244 var lvlByteLen int64
245 switch p := c.curPage.(type) {
246 case *DictionaryPage:
247 if err := c.configureDict(p); err != nil {
248 c.err = err
249 return false
250 }
251 continue
252 case *DataPageV1:
253 lvlByteLen, c.err = c.initLevelDecodersV1(p, p.repLvlEncoding, p.defLvlEncoding)
254 if c.err != nil {
255 return false
256 }
257 case *DataPageV2:
258 lvlByteLen, c.err = c.initLevelDecodersV2(p)
259 if c.err != nil {
260 return false
261 }
262 default:
263
264 continue
265 }
266
267 c.err = c.initDataDecoder(c.curPage, lvlByteLen)
268 return c.err == nil
269 }
270 c.err = c.rdr.Err()
271 return false
272 }
273
274 func (c *columnChunkReader) initLevelDecodersV2(page *DataPageV2) (int64, error) {
275 c.numBuffered = int64(page.nvals)
276 c.numDecoded = 0
277 buf := page.Data()
278 totalLvlLen := int64(page.repLvlByteLen) + int64(page.defLvlByteLen)
279
280 if totalLvlLen > int64(len(buf)) {
281 return totalLvlLen, xerrors.New("parquet: data page too small for levels (corrupt header?)")
282 }
283
284 if c.descr.MaxRepetitionLevel() > 0 {
285 c.repetitionDecoder.SetDataV2(page.repLvlByteLen, c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
286 }
287
288
289
290
291 buf = buf[page.repLvlByteLen:]
292
293 if c.descr.MaxDefinitionLevel() > 0 {
294 c.definitionDecoder.SetDataV2(page.defLvlByteLen, c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
295 }
296
297 return totalLvlLen, nil
298 }
299
300 func (c *columnChunkReader) initLevelDecodersV1(page *DataPageV1, repLvlEncoding, defLvlEncoding format.Encoding) (int64, error) {
301 c.numBuffered = int64(page.nvals)
302 c.numDecoded = 0
303
304 buf := page.Data()
305 maxSize := len(buf)
306 levelsByteLen := int64(0)
307
308
309
310 if c.descr.MaxRepetitionLevel() > 0 {
311 repBytes, err := c.repetitionDecoder.SetData(parquet.Encoding(repLvlEncoding), c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
312 if err != nil {
313 return levelsByteLen, err
314 }
315 buf = buf[repBytes:]
316 maxSize -= repBytes
317 levelsByteLen += int64(repBytes)
318 }
319
320 if c.descr.MaxDefinitionLevel() > 0 {
321 defBytes, err := c.definitionDecoder.SetData(parquet.Encoding(defLvlEncoding), c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
322 if err != nil {
323 return levelsByteLen, err
324 }
325 levelsByteLen += int64(defBytes)
326 maxSize -= defBytes
327 }
328
329 return levelsByteLen, nil
330 }
331
332 func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
333 buf := page.Data()
334 if int64(len(buf)) < lvlByteLen {
335 return xerrors.New("parquet: page smaller than size of encoded levels")
336 }
337
338 buf = buf[lvlByteLen:]
339 encoding := page.Encoding()
340
341 if isDictIndexEncoding(encoding) {
342 encoding = format.Encoding_RLE_DICTIONARY
343 }
344
345 if decoder, ok := c.decoders[encoding]; ok {
346 c.curDecoder = decoder
347 } else {
348 switch encoding {
349 case format.Encoding_RLE:
350 if c.descr.PhysicalType() != parquet.Types.Boolean {
351 return fmt.Errorf("parquet: only boolean supports RLE encoding, got %s", c.descr.PhysicalType())
352 }
353 fallthrough
354 case format.Encoding_PLAIN,
355 format.Encoding_DELTA_BYTE_ARRAY,
356 format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
357 format.Encoding_DELTA_BINARY_PACKED:
358 c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
359 c.decoders[encoding] = c.curDecoder
360 case format.Encoding_RLE_DICTIONARY:
361 return errors.New("parquet: dictionary page must be before data page")
362 case format.Encoding_BYTE_STREAM_SPLIT:
363 return fmt.Errorf("parquet: unsupported data encoding %s", encoding)
364 default:
365 return fmt.Errorf("parquet: unknown encoding type %s", encoding)
366 }
367 }
368
369 c.curEncoding = encoding
370 c.curDecoder.SetData(int(c.numBuffered), buf)
371 return nil
372 }
373
374
375
376
377
378
379
380
381 func (c *columnChunkReader) readDefinitionLevels(levels []int16) (totalDecoded int, valuesToRead int64) {
382 if c.descr.MaxDefinitionLevel() == 0 {
383 return 0, 0
384 }
385
386 return c.definitionDecoder.Decode(levels)
387 }
388
389
390
391
392
393
394
395 func (c *columnChunkReader) readRepetitionLevels(levels []int16) int {
396 if c.descr.MaxRepetitionLevel() == 0 {
397 return 0
398 }
399
400 nlevels, _ := c.repetitionDecoder.Decode(levels)
401 return nlevels
402 }
403
404
405
406
407
408
409
410
411
412
413
414
415 func (c *columnChunkReader) determineNumToRead(batchLen int64, defLvls, repLvls []int16) (ndefs int, toRead int64, err error) {
416 if !c.HasNext() {
417 return 0, 0, c.err
418 }
419
420 size := utils.Min(batchLen, c.numBuffered-c.numDecoded)
421
422 if c.descr.MaxDefinitionLevel() > 0 {
423 if defLvls == nil {
424 defLvls = c.getDefLvlBuffer(size)
425 }
426 ndefs, toRead = c.readDefinitionLevels(defLvls[:size])
427 } else {
428 toRead = size
429 }
430
431 if c.descr.MaxRepetitionLevel() > 0 && repLvls != nil {
432 nreps := c.readRepetitionLevels(repLvls[:size])
433 if defLvls != nil && ndefs != nreps {
434 err = xerrors.New("parquet: number of decoded rep/def levels did not match")
435 }
436 }
437 return
438 }
439
440
441
442
443 func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, buf []byte) (int64, error)) (int64, error) {
444 var err error
445 toskip := nvalues
446 for c.HasNext() && toskip > 0 {
447
448 if toskip > (c.numBuffered - c.numDecoded) {
449 toskip -= c.numBuffered - c.numDecoded
450 c.numDecoded = c.numBuffered
451 } else {
452 var (
453 batchSize int64 = 1024
454 valsRead int64 = 0
455 )
456
457 scratch := c.bufferPool.Get().(*memory.Buffer)
458 defer func() {
459 scratch.ResizeNoShrink(0)
460 c.bufferPool.Put(scratch)
461 }()
462 bufMult := 1
463 if c.descr.PhysicalType() == parquet.Types.Boolean {
464
465 bufMult = 8
466 }
467 scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
468
469 for {
470 batchSize = utils.Min(batchSize, toskip)
471 valsRead, err = readFn(batchSize, scratch.Buf())
472 toskip -= valsRead
473 if valsRead <= 0 || toskip <= 0 || err != nil {
474 break
475 }
476 }
477 }
478 }
479 if c.err != nil {
480 err = c.err
481 }
482 return nvalues - toskip, err
483 }
484
485 type readerFunc func(int64, int64) (int, error)
486
487
488
489
490
491
492
493 func (c *columnChunkReader) readBatch(batchSize int64, defLvls, repLvls []int16, readFn readerFunc) (totalLvls int64, totalRead int, err error) {
494 var (
495 read int
496 defs []int16
497 reps []int16
498 ndefs int
499 toRead int64
500 )
501
502 for c.HasNext() && totalLvls < batchSize && err == nil {
503 if defLvls != nil {
504 defs = defLvls[totalLvls:]
505 }
506 if repLvls != nil {
507 reps = repLvls[totalLvls:]
508 }
509 ndefs, toRead, err = c.determineNumToRead(batchSize-totalLvls, defs, reps)
510 if err != nil {
511 return totalLvls, totalRead, err
512 }
513
514 read, err = readFn(int64(totalRead), toRead)
515
516
517
518
519
520 totalVals := int64(utils.Max(ndefs, read))
521 c.consumeBufferedValues(totalVals)
522
523 totalLvls += totalVals
524 totalRead += read
525 }
526 return totalLvls, totalRead, err
527 }
528
View as plain text