1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "bytes"
21 "fmt"
22 "io"
23 "sync"
24
25 "github.com/JohnCGriffin/overflow"
26 "github.com/apache/arrow/go/v15/arrow/memory"
27 "github.com/apache/arrow/go/v15/parquet"
28 "github.com/apache/arrow/go/v15/parquet/compress"
29 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
30 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
31 "github.com/apache/arrow/go/v15/parquet/internal/thrift"
32 "github.com/apache/arrow/go/v15/parquet/metadata"
33 "golang.org/x/xerrors"
34 )
35
36
37
38 type PageReader interface {
39
40 SetMaxPageHeaderSize(int)
41
42 Page() Page
43
44 Next() bool
45
46
47 Err() error
48
49 Reset(r parquet.BufferedReader, nrows int64, compressType compress.Compression, ctx *CryptoContext)
50 }
51
52
53 type Page interface {
54
55 Type() format.PageType
56
57 Data() []byte
58
59 Encoding() format.Encoding
60
61 NumValues() int32
62
63 Release()
64 }
65
66 type page struct {
67 buf *memory.Buffer
68 typ format.PageType
69
70 nvals int32
71 encoding format.Encoding
72 }
73
74 func (p *page) Type() format.PageType { return p.typ }
75 func (p *page) Data() []byte { return p.buf.Bytes() }
76 func (p *page) NumValues() int32 { return p.nvals }
77 func (p *page) Encoding() format.Encoding { return p.encoding }
78
79
80
81 type DataPage interface {
82 Page
83 UncompressedSize() int32
84 Statistics() metadata.EncodedStatistics
85 }
86
87
88
89
90
91
92
93 var dataPageV1Pool = sync.Pool{
94 New: func() interface{} { return (*DataPageV1)(nil) },
95 }
96
97 var dataPageV2Pool = sync.Pool{
98 New: func() interface{} { return (*DataPageV2)(nil) },
99 }
100
101 var dictPagePool = sync.Pool{
102 New: func() interface{} { return (*DictionaryPage)(nil) },
103 }
104
105
106 type DataPageV1 struct {
107 page
108
109 defLvlEncoding format.Encoding
110 repLvlEncoding format.Encoding
111 uncompressedSize int32
112 statistics metadata.EncodedStatistics
113 }
114
115
116
117
118
119
120 func NewDataPageV1(buffer *memory.Buffer, num int32, encoding, defEncoding, repEncoding parquet.Encoding, uncompressedSize int32) *DataPageV1 {
121 dp := dataPageV1Pool.Get().(*DataPageV1)
122 if dp == nil {
123 return &DataPageV1{
124 page: page{buf: buffer, typ: format.PageType_DATA_PAGE, nvals: num, encoding: format.Encoding(encoding)},
125 defLvlEncoding: format.Encoding(defEncoding),
126 repLvlEncoding: format.Encoding(repEncoding),
127 uncompressedSize: uncompressedSize,
128 }
129 }
130
131 dp.buf, dp.nvals = buffer, num
132 dp.encoding = format.Encoding(encoding)
133 dp.defLvlEncoding, dp.repLvlEncoding = format.Encoding(defEncoding), format.Encoding(repEncoding)
134 dp.statistics.HasMax, dp.statistics.HasMin = false, false
135 dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false, false
136 dp.uncompressedSize = uncompressedSize
137 return dp
138 }
139
140
141 func NewDataPageV1WithStats(buffer *memory.Buffer, num int32, encoding, defEncoding, repEncoding parquet.Encoding, uncompressedSize int32, stats metadata.EncodedStatistics) *DataPageV1 {
142 ret := NewDataPageV1(buffer, num, encoding, defEncoding, repEncoding, uncompressedSize)
143 ret.statistics = stats
144 return ret
145 }
146
147
148
149
150
151 func (d *DataPageV1) Release() {
152 d.buf.Release()
153 d.buf = nil
154 dataPageV1Pool.Put(d)
155 }
156
157
158 func (d *DataPageV1) UncompressedSize() int32 { return d.uncompressedSize }
159
160
161 func (d *DataPageV1) Statistics() metadata.EncodedStatistics { return d.statistics }
162
163
164 func (d *DataPageV1) DefinitionLevelEncoding() parquet.Encoding {
165 return parquet.Encoding(d.defLvlEncoding)
166 }
167
168
169 func (d *DataPageV1) RepetitionLevelEncoding() parquet.Encoding {
170 return parquet.Encoding(d.repLvlEncoding)
171 }
172
173
174 type DataPageV2 struct {
175 page
176
177 nulls int32
178 nrows int32
179 defLvlByteLen int32
180 repLvlByteLen int32
181 compressed bool
182 uncompressedSize int32
183 statistics metadata.EncodedStatistics
184 }
185
186
187 func NewDataPageV2(buffer *memory.Buffer, numValues, numNulls, numRows int32, encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen, uncompressed int32, isCompressed bool) *DataPageV2 {
188 dp := dataPageV2Pool.Get().(*DataPageV2)
189 if dp == nil {
190 return &DataPageV2{
191 page: page{buf: buffer, typ: format.PageType_DATA_PAGE_V2, nvals: numValues, encoding: format.Encoding(encoding)},
192 nulls: numNulls,
193 nrows: numRows,
194 defLvlByteLen: defLvlsByteLen,
195 repLvlByteLen: repLvlsByteLen,
196 compressed: isCompressed,
197 uncompressedSize: uncompressed,
198 }
199 }
200
201 dp.buf, dp.nvals = buffer, numValues
202 dp.encoding = format.Encoding(encoding)
203 dp.nulls, dp.nrows = numNulls, numRows
204 dp.defLvlByteLen, dp.repLvlByteLen = defLvlsByteLen, repLvlsByteLen
205 dp.compressed, dp.uncompressedSize = isCompressed, uncompressed
206 dp.statistics.HasMax, dp.statistics.HasMin = false, false
207 dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false, false
208 return dp
209 }
210
211
212 func NewDataPageV2WithStats(buffer *memory.Buffer, numValues, numNulls, numRows int32, encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen, uncompressed int32, isCompressed bool, stats metadata.EncodedStatistics) *DataPageV2 {
213 ret := NewDataPageV2(buffer, numValues, numNulls, numRows, encoding, defLvlsByteLen, repLvlsByteLen, uncompressed, isCompressed)
214 ret.statistics = stats
215 return ret
216 }
217
218
219
220
221
222 func (d *DataPageV2) Release() {
223 d.buf.Release()
224 d.buf = nil
225 dataPageV2Pool.Put(d)
226 }
227
228
229
230 func (d *DataPageV2) UncompressedSize() int32 { return d.uncompressedSize }
231
232
233 func (d *DataPageV2) Statistics() metadata.EncodedStatistics { return d.statistics }
234
235
236 func (d *DataPageV2) NumNulls() int32 { return d.nulls }
237
238
239 func (d *DataPageV2) NumRows() int32 { return d.nrows }
240
241
242 func (d *DataPageV2) DefinitionLevelByteLen() int32 { return d.defLvlByteLen }
243
244
245 func (d *DataPageV2) RepetitionLevelByteLen() int32 { return d.repLvlByteLen }
246
247
248 func (d *DataPageV2) IsCompressed() bool { return d.compressed }
249
250
251 type DictionaryPage struct {
252 page
253
254 sorted bool
255 }
256
257
258 func NewDictionaryPage(buffer *memory.Buffer, nvals int32, encoding parquet.Encoding) *DictionaryPage {
259 dp := dictPagePool.Get().(*DictionaryPage)
260 if dp == nil {
261 return &DictionaryPage{
262 page: page{
263 buf: buffer,
264 typ: format.PageType_DICTIONARY_PAGE,
265 nvals: nvals,
266 encoding: format.Encoding(encoding),
267 },
268 }
269 }
270
271 dp.buf = buffer
272 dp.nvals = nvals
273 dp.encoding = format.Encoding(encoding)
274 dp.sorted = false
275 return dp
276 }
277
278
279
280
281
282 func (d *DictionaryPage) Release() {
283 d.buf.Release()
284 d.buf = nil
285 dictPagePool.Put(d)
286 }
287
288
289 func (d *DictionaryPage) IsSorted() bool { return d.sorted }
290
291 type serializedPageReader struct {
292 r parquet.BufferedReader
293 nrows int64
294 rowsSeen int64
295 mem memory.Allocator
296 codec compress.Codec
297
298 curPageHdr *format.PageHeader
299 pageOrd int16
300 maxPageHeaderSize int
301
302 curPage Page
303 cryptoCtx CryptoContext
304 dataPageAad string
305 dataPageHeaderAad string
306
307 decompressBuffer bytes.Buffer
308 err error
309 }
310
311
312 func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader, error) {
313 if mem == nil {
314 mem = memory.NewGoAllocator()
315 }
316
317 codec, err := compress.GetCodec(compressType)
318 if err != nil {
319 return nil, err
320 }
321
322 rdr := &serializedPageReader{
323 r: r,
324 maxPageHeaderSize: defaultMaxPageHeaderSize,
325 nrows: nrows,
326 mem: mem,
327 codec: codec,
328 }
329 rdr.decompressBuffer.Grow(defaultPageHeaderSize)
330 if ctx != nil {
331 rdr.cryptoCtx = *ctx
332 rdr.initDecryption()
333 }
334 return rdr, nil
335 }
336
337 func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, compressType compress.Compression, ctx *CryptoContext) {
338 p.rowsSeen, p.pageOrd, p.nrows = 0, 0, nrows
339 p.curPageHdr, p.curPage, p.err = nil, nil, nil
340 p.r = r
341
342 p.codec, p.err = compress.GetCodec(compressType)
343 if p.err != nil {
344 return
345 }
346 p.decompressBuffer.Reset()
347 if ctx != nil {
348 p.cryptoCtx = *ctx
349 p.initDecryption()
350 } else {
351 p.cryptoCtx = CryptoContext{}
352 p.dataPageAad = ""
353 p.dataPageHeaderAad = ""
354 }
355 }
356
357 func (p *serializedPageReader) Err() error { return p.err }
358
359 func (p *serializedPageReader) SetMaxPageHeaderSize(sz int) {
360 p.maxPageHeaderSize = sz
361 }
362
363 func (p *serializedPageReader) initDecryption() {
364 if p.cryptoCtx.DataDecryptor != nil {
365 p.dataPageAad = encryption.CreateModuleAad(p.cryptoCtx.DataDecryptor.FileAad(), encryption.DataPageModule,
366 p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1)
367 }
368 if p.cryptoCtx.MetaDecryptor != nil {
369 p.dataPageHeaderAad = encryption.CreateModuleAad(p.cryptoCtx.MetaDecryptor.FileAad(), encryption.DataPageHeaderModule,
370 p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1)
371 }
372 }
373
374 func (p *serializedPageReader) updateDecryption(decrypt encryption.Decryptor, moduleType int8, pageAad string) {
375 if p.cryptoCtx.StartDecryptWithDictionaryPage {
376 aad := encryption.CreateModuleAad(decrypt.FileAad(), moduleType, p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1)
377 decrypt.UpdateAad(aad)
378 } else {
379 pageaad := []byte(pageAad)
380 encryption.QuickUpdatePageAad(pageaad, p.pageOrd)
381 decrypt.UpdateAad(string(pageaad))
382 }
383 }
384
385 func (p *serializedPageReader) Page() Page {
386 return p.curPage
387 }
388
389 func (p *serializedPageReader) decompress(lenCompressed int, buf []byte) ([]byte, error) {
390 p.decompressBuffer.Grow(lenCompressed)
391 if _, err := io.CopyN(&p.decompressBuffer, p.r, int64(lenCompressed)); err != nil {
392 return nil, err
393 }
394
395 data := p.decompressBuffer.Bytes()
396 if p.cryptoCtx.DataDecryptor != nil {
397 data = p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
398 }
399
400 return p.codec.Decode(buf, data), nil
401 }
402
403 type dataheader interface {
404 IsSetStatistics() bool
405 GetStatistics() *format.Statistics
406 }
407
408 func extractStats(dataHeader dataheader) (pageStats metadata.EncodedStatistics) {
409 if dataHeader.IsSetStatistics() {
410 stats := dataHeader.GetStatistics()
411 if stats.IsSetMaxValue() {
412 pageStats.SetMax(stats.GetMaxValue())
413 } else if stats.IsSetMax() {
414 pageStats.SetMax(stats.GetMax())
415 }
416 if stats.IsSetMinValue() {
417 pageStats.SetMin(stats.GetMinValue())
418 } else if stats.IsSetMin() {
419 pageStats.SetMin(stats.GetMin())
420 }
421
422 if stats.IsSetNullCount() {
423 pageStats.SetNullCount(stats.GetNullCount())
424 }
425 if stats.IsSetDistinctCount() {
426 pageStats.SetDistinctCount(stats.GetDistinctCount())
427 }
428 }
429 return
430 }
431
432 func (p *serializedPageReader) Next() bool {
433
434
435 if p.curPage != nil {
436 p.curPage.Release()
437 }
438 p.curPage = nil
439 p.curPageHdr = format.NewPageHeader()
440 p.err = nil
441
442 for p.rowsSeen < p.nrows {
443 allowedPgSz := defaultPageHeaderSize
444 p.decompressBuffer.Reset()
445 for {
446 view, err := p.r.Peek(allowedPgSz)
447 if err != nil && err != io.EOF {
448 p.err = err
449 return false
450 }
451
452 if len(view) == 0 {
453 return false
454 }
455
456 extra := 0
457 if p.cryptoCtx.MetaDecryptor != nil {
458 p.updateDecryption(p.cryptoCtx.MetaDecryptor, encryption.DictPageHeaderModule, p.dataPageHeaderAad)
459 view = p.cryptoCtx.MetaDecryptor.Decrypt(view)
460 extra = p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta()
461 }
462
463 remaining, err := thrift.DeserializeThrift(p.curPageHdr, view)
464 if err != nil {
465 allowedPgSz *= 2
466 if allowedPgSz > p.maxPageHeaderSize {
467 p.err = xerrors.New("parquet: deserializing page header failed")
468 return false
469 }
470 continue
471 }
472
473 p.r.Discard(len(view) - int(remaining) + extra)
474 break
475 }
476
477 lenCompressed := int(p.curPageHdr.GetCompressedPageSize())
478 lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize())
479 if lenCompressed < 0 || lenUncompressed < 0 {
480 p.err = xerrors.New("parquet: invalid page header")
481 return false
482 }
483
484 if p.cryptoCtx.DataDecryptor != nil {
485 p.updateDecryption(p.cryptoCtx.DataDecryptor, encryption.DictPageModule, p.dataPageAad)
486 }
487
488 buf := memory.NewResizableBuffer(p.mem)
489 defer buf.Release()
490 buf.ResizeNoShrink(lenUncompressed)
491
492 switch p.curPageHdr.GetType() {
493 case format.PageType_DICTIONARY_PAGE:
494 p.cryptoCtx.StartDecryptWithDictionaryPage = false
495 dictHeader := p.curPageHdr.GetDictionaryPageHeader()
496 if dictHeader.GetNumValues() < 0 {
497 p.err = xerrors.New("parquet: invalid page header (negative number of values)")
498 return false
499 }
500
501 data, err := p.decompress(lenCompressed, buf.Bytes())
502 if err != nil {
503 p.err = err
504 return false
505 }
506 if len(data) != lenUncompressed {
507 p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed dictionary page, got %d bytes", lenUncompressed, len(data))
508 return false
509 }
510
511
512 p.curPage = &DictionaryPage{
513 page: page{
514 buf: memory.NewBufferBytes(data),
515 typ: p.curPageHdr.Type,
516 nvals: dictHeader.GetNumValues(),
517 encoding: dictHeader.GetEncoding(),
518 },
519 sorted: dictHeader.IsSetIsSorted() && dictHeader.GetIsSorted(),
520 }
521
522 case format.PageType_DATA_PAGE:
523 p.pageOrd++
524 dataHeader := p.curPageHdr.GetDataPageHeader()
525 if dataHeader.GetNumValues() < 0 {
526 p.err = xerrors.New("parquet: invalid page header (negative number of values)")
527 return false
528 }
529
530 p.rowsSeen += int64(dataHeader.GetNumValues())
531 data, err := p.decompress(lenCompressed, buf.Bytes())
532 if err != nil {
533 p.err = err
534 return false
535 }
536 if len(data) != lenUncompressed {
537 p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
538 return false
539 }
540
541
542 p.curPage = &DataPageV1{
543 page: page{
544 buf: memory.NewBufferBytes(data),
545 typ: p.curPageHdr.Type,
546 nvals: dataHeader.GetNumValues(),
547 encoding: dataHeader.GetEncoding(),
548 },
549 defLvlEncoding: dataHeader.GetDefinitionLevelEncoding(),
550 repLvlEncoding: dataHeader.GetRepetitionLevelEncoding(),
551 uncompressedSize: int32(lenUncompressed),
552 statistics: extractStats(dataHeader),
553 }
554 case format.PageType_DATA_PAGE_V2:
555 p.pageOrd++
556 dataHeader := p.curPageHdr.GetDataPageHeaderV2()
557 if dataHeader.GetNumValues() < 0 {
558 p.err = xerrors.New("parquet: invalid page header (negative number of values)")
559 return false
560 }
561
562 if dataHeader.GetDefinitionLevelsByteLength() < 0 || dataHeader.GetRepetitionLevelsByteLength() < 0 {
563 p.err = xerrors.New("parquet: invalid page header (negative levels byte length)")
564 return false
565 }
566
567 compressed := dataHeader.GetIsCompressed()
568
569 p.rowsSeen += int64(dataHeader.GetNumValues())
570 levelsBytelen, ok := overflow.Add(int(dataHeader.GetDefinitionLevelsByteLength()), int(dataHeader.GetRepetitionLevelsByteLength()))
571 if !ok {
572 p.err = xerrors.New("parquet: levels size too large (corrupt file?)")
573 return false
574 }
575
576 if compressed {
577 if levelsBytelen > 0 {
578 io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
579 }
580 if _, p.err = p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
581 return false
582 }
583 } else {
584 io.ReadFull(p.r, buf.Bytes())
585 }
586 buf.Retain()
587
588 if buf.Len() != lenUncompressed {
589 p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, buf.Len())
590 return false
591 }
592
593
594 p.curPage = &DataPageV2{
595 page: page{
596 buf: buf,
597 typ: p.curPageHdr.Type,
598 nvals: dataHeader.GetNumValues(),
599 encoding: dataHeader.GetEncoding(),
600 },
601 nulls: dataHeader.GetNumNulls(),
602 nrows: dataHeader.GetNumRows(),
603 defLvlByteLen: dataHeader.GetDefinitionLevelsByteLength(),
604 repLvlByteLen: dataHeader.GetRepetitionLevelsByteLength(),
605 compressed: compressed,
606 uncompressedSize: int32(lenUncompressed),
607 statistics: extractStats(dataHeader),
608 }
609 default:
610
611 continue
612 }
613 return true
614 }
615
616 return false
617 }
618
View as plain text