1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "bytes"
21 "encoding/binary"
22 "fmt"
23 "io"
24 "os"
25 "runtime"
26 "sync"
27
28 "github.com/apache/arrow/go/v15/arrow/memory"
29 "github.com/apache/arrow/go/v15/parquet"
30 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
31 "github.com/apache/arrow/go/v15/parquet/metadata"
32 "golang.org/x/xerrors"
33 )
34
35 const (
36 footerSize uint32 = 8
37 )
38
39 var (
40 magicBytes = []byte("PAR1")
41 magicEBytes = []byte("PARE")
42 errInconsistentFileMetadata = xerrors.New("parquet: file is smaller than indicated metadata size")
43 )
44
45
46 type Reader struct {
47 r parquet.ReaderAtSeeker
48 props *parquet.ReaderProperties
49 metadata *metadata.FileMetaData
50 footerOffset int64
51 fileDecryptor encryption.FileDecryptor
52
53 bufferPool sync.Pool
54 }
55
56 type ReadOption func(*Reader)
57
58
59
60 func WithReadProps(props *parquet.ReaderProperties) ReadOption {
61 return func(r *Reader) {
62 r.props = props
63 }
64 }
65
66
67
68 func WithMetadata(m *metadata.FileMetaData) ReadOption {
69 return func(r *Reader) {
70 r.metadata = m
71 }
72 }
73
74
75
76
77
78
79 func OpenParquetFile(filename string, memoryMap bool, opts ...ReadOption) (*Reader, error) {
80 var source parquet.ReaderAtSeeker
81
82 var err error
83 if memoryMap {
84 source, err = mmapOpen(filename)
85 if err != nil {
86 return nil, err
87 }
88 } else {
89 source, err = os.Open(filename)
90 if err != nil {
91 return nil, err
92 }
93 }
94 return NewParquetReader(source, opts...)
95 }
96
97
98
99
100
101
102 func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, error) {
103 var err error
104 f := &Reader{r: r}
105 for _, o := range opts {
106 o(f)
107 }
108
109 if f.footerOffset <= 0 {
110 f.footerOffset, err = r.Seek(0, io.SeekEnd)
111 if err != nil {
112 return nil, fmt.Errorf("parquet: could not retrieve footer offset: %w", err)
113 }
114 }
115
116 if f.props == nil {
117 f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
118 }
119
120 f.bufferPool = sync.Pool{
121 New: func() interface{} {
122 buf := memory.NewResizableBuffer(f.props.Allocator())
123 runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
124 obj.Release()
125 })
126 return buf
127 },
128 }
129
130 if f.metadata == nil {
131 return f, f.parseMetaData()
132 }
133
134 return f, nil
135 }
136
137
138
139
140
141 func (f *Reader) BufferPool() *sync.Pool {
142 return &f.bufferPool
143 }
144
145
146
147 func (f *Reader) Close() error {
148 if r, ok := f.r.(io.Closer); ok {
149 return r.Close()
150 }
151 return nil
152 }
153
154
155 func (f *Reader) MetaData() *metadata.FileMetaData { return f.metadata }
156
157
158 func (f *Reader) parseMetaData() error {
159 if f.footerOffset <= int64(footerSize) {
160 return fmt.Errorf("parquet: file too small (size=%d)", f.footerOffset)
161 }
162
163 buf := make([]byte, footerSize)
164
165 n, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize))
166 if err != nil && err != io.EOF {
167 return fmt.Errorf("parquet: could not read footer: %w", err)
168 }
169 if n != len(buf) {
170 return fmt.Errorf("parquet: could not read %d bytes from end of file", len(buf))
171 }
172
173 size := int64(binary.LittleEndian.Uint32(buf[:4]))
174 if size < 0 || size+int64(footerSize) > f.footerOffset {
175 return errInconsistentFileMetadata
176 }
177
178 fileDecryptProps := f.props.FileDecryptProps
179
180 switch {
181 case bytes.Equal(buf[4:], magicBytes):
182 buf = make([]byte, size)
183 if _, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize)-size); err != nil {
184 return fmt.Errorf("parquet: could not read footer: %w", err)
185 }
186
187 f.metadata, err = metadata.NewFileMetaData(buf, nil)
188 if err != nil {
189 return fmt.Errorf("parquet: could not read footer: %w", err)
190 }
191
192 if !f.metadata.IsSetEncryptionAlgorithm() {
193 if fileDecryptProps != nil && !fileDecryptProps.PlaintextFilesAllowed() {
194 return fmt.Errorf("parquet: applying decryption properties on plaintext file")
195 }
196 } else {
197 if err := f.parseMetaDataEncryptedFilePlaintextFooter(fileDecryptProps, buf); err != nil {
198 return err
199 }
200 }
201 case bytes.Equal(buf[4:], magicEBytes):
202 buf = make([]byte, size)
203 if _, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize)-size); err != nil {
204 return fmt.Errorf("parquet: could not read footer: %w", err)
205 }
206
207 if fileDecryptProps == nil {
208 return xerrors.New("could not read encrypted metadata, no decryption found in reader's properties")
209 }
210
211 fileCryptoMetadata, err := metadata.NewFileCryptoMetaData(buf)
212 if err != nil {
213 return err
214 }
215 algo := fileCryptoMetadata.EncryptionAlgorithm()
216 fileAad, err := f.handleAadPrefix(fileDecryptProps, &algo)
217 if err != nil {
218 return err
219 }
220 f.fileDecryptor = encryption.NewFileDecryptor(fileDecryptProps, fileAad, algo.Algo, string(fileCryptoMetadata.KeyMetadata()), f.props.Allocator())
221
222 f.metadata, err = metadata.NewFileMetaData(buf[fileCryptoMetadata.Len():], f.fileDecryptor)
223 if err != nil {
224 return fmt.Errorf("parquet: could not read footer: %w", err)
225 }
226 default:
227 return fmt.Errorf("parquet: magic bytes not found in footer. Either the file is corrupted or this isn't a parquet file")
228 }
229
230 return nil
231 }
232
233 func (f *Reader) handleAadPrefix(fileDecrypt *parquet.FileDecryptionProperties, algo *parquet.Algorithm) (string, error) {
234 aadPrefixInProps := fileDecrypt.AadPrefix()
235 aadPrefix := []byte(aadPrefixInProps)
236 fileHasAadPrefix := algo.Aad.AadPrefix != nil && len(algo.Aad.AadPrefix) > 0
237 aadPrefixInFile := algo.Aad.AadPrefix
238
239 if algo.Aad.SupplyAadPrefix && aadPrefixInProps == "" {
240 return "", xerrors.New("AAD Prefix used for file encryption but not stored in file and not supplied in decryption props")
241 }
242
243 if fileHasAadPrefix {
244 if aadPrefixInProps != "" {
245 if aadPrefixInProps != string(aadPrefixInFile) {
246 return "", xerrors.New("AAD prefix in file and in properties but not the same")
247 }
248 }
249 aadPrefix = aadPrefixInFile
250 if fileDecrypt.Verifier != nil {
251 fileDecrypt.Verifier.Verify(string(aadPrefix))
252 }
253 } else {
254 if !algo.Aad.SupplyAadPrefix && aadPrefixInProps != "" {
255 return "", xerrors.New("AAD Prefix set in decryptionproperties but was not used for file encryption")
256 }
257 if fileDecrypt.Verifier != nil {
258 return "", xerrors.New("AAD Prefix Verifier is set but AAD Prefix not found in file")
259 }
260 }
261 return string(append(aadPrefix, algo.Aad.AadFileUnique...)), nil
262 }
263
264 func (f *Reader) parseMetaDataEncryptedFilePlaintextFooter(decryptProps *parquet.FileDecryptionProperties, data []byte) error {
265 if decryptProps != nil {
266 algo := f.metadata.EncryptionAlgorithm()
267 fileAad, err := f.handleAadPrefix(decryptProps, &algo)
268 if err != nil {
269 return err
270 }
271 f.fileDecryptor = encryption.NewFileDecryptor(decryptProps, fileAad, algo.Algo, string(f.metadata.GetFooterSigningKeyMetadata()), f.props.Allocator())
272
273
274 f.metadata.FileDecryptor = f.fileDecryptor
275 if decryptProps.PlaintextFooterIntegrity() {
276 if len(data)-f.metadata.Size() != encryption.GcmTagLength+encryption.NonceLength {
277 return xerrors.New("failed reading metadata for encryption signature")
278 }
279
280 if !f.metadata.VerifySignature(data[f.metadata.Size():]) {
281 return xerrors.New("parquet crypto signature verification failed")
282 }
283 }
284 }
285 return nil
286 }
287
288
289
290 func (f *Reader) WriterVersion() *metadata.AppVersion {
291 return f.metadata.WriterVersion()
292 }
293
294
295 func (f *Reader) NumRows() int64 {
296 return f.metadata.GetNumRows()
297 }
298
299
300 func (f *Reader) NumRowGroups() int {
301 return len(f.metadata.GetRowGroups())
302 }
303
304
305 func (f *Reader) RowGroup(i int) *RowGroupReader {
306 rg := f.metadata.RowGroups[i]
307
308 return &RowGroupReader{
309 fileMetadata: f.metadata,
310 rgMetadata: metadata.NewRowGroupMetaData(rg, f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
311 props: f.props,
312 r: f.r,
313 sourceSz: f.footerOffset,
314 fileDecryptor: f.fileDecryptor,
315 bufferPool: &f.bufferPool,
316 }
317 }
318
View as plain text