1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "fmt"
21 "sync"
22
23 "github.com/apache/arrow/go/v15/internal/utils"
24 "github.com/apache/arrow/go/v15/parquet"
25 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
26 "github.com/apache/arrow/go/v15/parquet/metadata"
27 "golang.org/x/xerrors"
28 )
29
30 const (
31 maxDictHeaderSize int64 = 100
32 )
33
34
35 type RowGroupReader struct {
36 r parquet.ReaderAtSeeker
37 sourceSz int64
38 fileMetadata *metadata.FileMetaData
39 rgMetadata *metadata.RowGroupMetaData
40 props *parquet.ReaderProperties
41 fileDecryptor encryption.FileDecryptor
42
43 bufferPool *sync.Pool
44 }
45
46
47 func (r *RowGroupReader) MetaData() *metadata.RowGroupMetaData { return r.rgMetadata }
48
49
50 func (r *RowGroupReader) NumColumns() int { return r.rgMetadata.NumColumns() }
51
52
53 func (r *RowGroupReader) NumRows() int64 { return r.rgMetadata.NumRows() }
54
55
56 func (r *RowGroupReader) ByteSize() int64 { return r.rgMetadata.TotalByteSize() }
57
58
59
60
61 func (r *RowGroupReader) Column(i int) (ColumnChunkReader, error) {
62 if i >= r.NumColumns() || i < 0 {
63 return nil, fmt.Errorf("parquet: trying to read column index %d but row group metadata only has %d columns", i, r.rgMetadata.NumColumns())
64 }
65
66 descr := r.fileMetadata.Schema.Column(i)
67 pageRdr, err := r.GetColumnPageReader(i)
68 if err != nil {
69 return nil, fmt.Errorf("parquet: unable to initialize page reader: %w", err)
70 }
71 return NewColumnReader(descr, pageRdr, r.props.Allocator(), r.bufferPool), nil
72 }
73
74 func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
75 col, err := r.rgMetadata.ColumnChunk(i)
76 if err != nil {
77 return nil, err
78 }
79
80 colStart := col.DataPageOffset()
81 if col.HasDictionaryPage() && col.DictionaryPageOffset() > 0 && colStart > col.DictionaryPageOffset() {
82 colStart = col.DictionaryPageOffset()
83 }
84
85 colLen := col.TotalCompressedSize()
86
87 if r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
88
89
90
91 if colStart < 0 || colLen < 0 {
92 return nil, fmt.Errorf("invalid column chunk metadata, offset (%d) and length (%d) should both be positive", colStart, colLen)
93 }
94 if colStart > r.sourceSz || colLen > r.sourceSz {
95 return nil, fmt.Errorf("invalid column chunk metadata, offset (%d) and length (%d) must both be less than total source size (%d)", colStart, colLen, r.sourceSz)
96 }
97 bytesRemain := r.sourceSz - (colStart + colLen)
98 padding := utils.Min(maxDictHeaderSize, bytesRemain)
99 colLen += padding
100 }
101
102 stream, err := r.props.GetStream(r.r, colStart, colLen)
103 if err != nil {
104 return nil, err
105 }
106
107 cryptoMetadata := col.CryptoMetadata()
108 if cryptoMetadata == nil {
109 return NewPageReader(stream, col.NumValues(), col.Compression(), r.props.Allocator(), nil)
110 }
111
112 if r.fileDecryptor == nil {
113 return nil, xerrors.New("column in rowgroup is encrypted, but no file decryptor")
114 }
115
116 const encryptedRowGroupsLimit = 32767
117 if i > encryptedRowGroupsLimit {
118 return nil, xerrors.New("encrypted files cannot contain more than 32767 column chunks")
119 }
120
121 if cryptoMetadata.IsSetENCRYPTION_WITH_FOOTER_KEY() {
122 ctx := CryptoContext{
123 StartDecryptWithDictionaryPage: col.HasDictionaryPage(),
124 RowGroupOrdinal: r.rgMetadata.Ordinal(),
125 ColumnOrdinal: int16(i),
126 MetaDecryptor: r.fileDecryptor.GetFooterDecryptorForColumnMeta(""),
127 DataDecryptor: r.fileDecryptor.GetFooterDecryptorForColumnData(""),
128 }
129 return NewPageReader(stream, col.NumValues(), col.Compression(), r.props.Allocator(), &ctx)
130 }
131
132
133 columnKeyMeta := cryptoMetadata.GetENCRYPTION_WITH_COLUMN_KEY().KeyMetadata
134 columnPath := cryptoMetadata.GetENCRYPTION_WITH_COLUMN_KEY().PathInSchema
135
136 ctx := CryptoContext{
137 StartDecryptWithDictionaryPage: col.HasDictionaryPage(),
138 RowGroupOrdinal: r.rgMetadata.Ordinal(),
139 ColumnOrdinal: int16(i),
140 MetaDecryptor: r.fileDecryptor.GetColumnMetaDecryptor(parquet.ColumnPath(columnPath).String(), string(columnKeyMeta), ""),
141 DataDecryptor: r.fileDecryptor.GetColumnDataDecryptor(parquet.ColumnPath(columnPath).String(), string(columnKeyMeta), ""),
142 }
143 return NewPageReader(stream, col.NumValues(), col.Compression(), r.props.Allocator(), &ctx)
144 }
145
View as plain text