1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package metadata
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "reflect"
25 "unicode/utf8"
26
27 "github.com/apache/arrow/go/v15/parquet"
28 "github.com/apache/arrow/go/v15/parquet/compress"
29 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
30 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
31 "github.com/apache/arrow/go/v15/parquet/internal/thrift"
32 "github.com/apache/arrow/go/v15/parquet/schema"
33 "golang.org/x/xerrors"
34 )
35
36
37
38 var DefaultCompressionType = compress.Codecs.Uncompressed
39
40
41
42 type FileMetaDataBuilder struct {
43 metadata *format.FileMetaData
44 props *parquet.WriterProperties
45 schema *schema.Schema
46 rowGroups []*format.RowGroup
47 currentRgBldr *RowGroupMetaDataBuilder
48 kvmeta KeyValueMetadata
49 cryptoMetadata *format.FileCryptoMetaData
50 }
51
52
53
54 func NewFileMetadataBuilder(schema *schema.Schema, props *parquet.WriterProperties, kvmeta KeyValueMetadata) *FileMetaDataBuilder {
55 var crypto *format.FileCryptoMetaData
56 if props.FileEncryptionProperties() != nil && props.FileEncryptionProperties().EncryptedFooter() {
57 crypto = format.NewFileCryptoMetaData()
58 }
59 return &FileMetaDataBuilder{
60 metadata: format.NewFileMetaData(),
61 props: props,
62 schema: schema,
63 kvmeta: kvmeta,
64 cryptoMetadata: crypto,
65 }
66 }
67
68
69
70 func (f *FileMetaDataBuilder) GetFileCryptoMetaData() *FileCryptoMetadata {
71 if f.cryptoMetadata == nil {
72 return nil
73 }
74
75 props := f.props.FileEncryptionProperties()
76 f.cryptoMetadata.EncryptionAlgorithm = props.Algorithm().ToThrift()
77 keyMetadata := props.FooterKeyMetadata()
78 if keyMetadata != "" {
79 f.cryptoMetadata.KeyMetadata = []byte(keyMetadata)
80 }
81
82 return &FileCryptoMetadata{f.cryptoMetadata, 0}
83 }
84
85
86
87 func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder {
88 if f.rowGroups == nil {
89 f.rowGroups = make([]*format.RowGroup, 0, 1)
90 }
91
92 rg := format.NewRowGroup()
93 f.rowGroups = append(f.rowGroups, rg)
94 f.currentRgBldr = NewRowGroupMetaDataBuilder(f.props, f.schema, rg)
95 return f.currentRgBldr
96 }
97
98
99 func (f *FileMetaDataBuilder) AppendKeyValueMetadata(key string, value string) error {
100 return f.kvmeta.Append(key, value)
101 }
102
103
104
105
106 func (f *FileMetaDataBuilder) Finish() (*FileMetaData, error) {
107 totalRows := int64(0)
108 for _, rg := range f.rowGroups {
109 totalRows += rg.NumRows
110 }
111 f.metadata.NumRows = totalRows
112 f.metadata.RowGroups = f.rowGroups
113 switch f.props.Version() {
114 case parquet.V1_0:
115 f.metadata.Version = 1
116 default:
117 f.metadata.Version = 2
118 }
119 createdBy := f.props.CreatedBy()
120 f.metadata.CreatedBy = &createdBy
121
122
123
124
125
126
127
128 typeDefined := format.NewTypeDefinedOrder()
129 colOrder := &format.ColumnOrder{TYPE_ORDER: typeDefined}
130 f.metadata.ColumnOrders = make([]*format.ColumnOrder, f.schema.NumColumns())
131 for idx := range f.metadata.ColumnOrders {
132 f.metadata.ColumnOrders[idx] = colOrder
133 }
134
135 encryptProps := f.props.FileEncryptionProperties()
136 if encryptProps != nil && !encryptProps.EncryptedFooter() {
137 var signingAlgo parquet.Algorithm
138 algo := encryptProps.Algorithm()
139 signingAlgo.Aad.AadFileUnique = algo.Aad.AadFileUnique
140 signingAlgo.Aad.SupplyAadPrefix = algo.Aad.SupplyAadPrefix
141 if !algo.Aad.SupplyAadPrefix {
142 signingAlgo.Aad.AadPrefix = algo.Aad.AadPrefix
143 }
144 signingAlgo.Algo = parquet.AesGcm
145 f.metadata.EncryptionAlgorithm = signingAlgo.ToThrift()
146 footerSigningMetadata := f.props.FileEncryptionProperties().FooterKeyMetadata()
147 if footerSigningMetadata != "" {
148 f.metadata.FooterSigningKeyMetadata = []byte(footerSigningMetadata)
149 }
150 }
151
152 f.metadata.Schema = schema.ToThrift(f.schema.Root())
153 f.metadata.KeyValueMetadata = f.kvmeta
154
155 out := &FileMetaData{
156 FileMetaData: f.metadata,
157 version: NewAppVersion(f.metadata.GetCreatedBy()),
158 }
159 if err := out.initSchema(); err != nil {
160 return nil, err
161 }
162 out.initColumnOrders()
163
164 f.metadata = format.NewFileMetaData()
165 f.rowGroups = nil
166 return out, nil
167 }
168
169
170
171
172 type KeyValueMetadata []*format.KeyValue
173
174
175 func NewKeyValueMetadata() KeyValueMetadata {
176 return make(KeyValueMetadata, 0)
177 }
178
179
180
181 func (k *KeyValueMetadata) Append(key, value string) error {
182 if !utf8.ValidString(key) || !utf8.ValidString(value) {
183 return fmt.Errorf("metadata must be valid utf8 strings, got key = '%s' and value = '%s'", key, value)
184 }
185 *k = append(*k, &format.KeyValue{Key: key, Value: &value})
186 return nil
187 }
188
189 func (k KeyValueMetadata) Len() int { return len(k) }
190
191
192 func (k KeyValueMetadata) Equals(other KeyValueMetadata) bool {
193 return reflect.DeepEqual(k, other)
194 }
195
196 func (k KeyValueMetadata) Keys() (ret []string) {
197 ret = make([]string, len(k))
198 for idx, v := range k {
199 ret[idx] = v.GetKey()
200 }
201 return
202 }
203
204 func (k KeyValueMetadata) Values() (ret []string) {
205 ret = make([]string, len(k))
206 for idx, v := range k {
207 ret[idx] = v.GetValue()
208 }
209 return
210 }
211
212 func (k KeyValueMetadata) FindValue(key string) *string {
213 for _, v := range k {
214 if v.Key == key {
215 return v.Value
216 }
217 }
218 return nil
219 }
220
221
222
223 type FileMetaData struct {
224 *format.FileMetaData
225 Schema *schema.Schema
226 FileDecryptor encryption.FileDecryptor
227
228
229 version *AppVersion
230
231
232 metadataLen int
233 }
234
235
236
237 func NewFileMetaData(data []byte, fileDecryptor encryption.FileDecryptor) (*FileMetaData, error) {
238 meta := format.NewFileMetaData()
239 if fileDecryptor != nil {
240 footerDecryptor := fileDecryptor.GetFooterDecryptor()
241 data = footerDecryptor.Decrypt(data)
242 }
243
244 remain, err := thrift.DeserializeThrift(meta, data)
245 if err != nil {
246 return nil, err
247 }
248
249 f := &FileMetaData{
250 FileMetaData: meta,
251 version: NewAppVersion(meta.GetCreatedBy()),
252 metadataLen: len(data) - int(remain),
253 FileDecryptor: fileDecryptor,
254 }
255
256 f.initSchema()
257 f.initColumnOrders()
258
259 return f, nil
260 }
261
262
263 func (f *FileMetaData) Size() int { return f.metadataLen }
264
265
266 func (f *FileMetaData) NumSchemaElements() int {
267 return len(f.FileMetaData.Schema)
268 }
269
270
271 func (f *FileMetaData) RowGroup(i int) *RowGroupMetaData {
272 return &RowGroupMetaData{
273 f.RowGroups[i], f.Schema, f.version, f.FileDecryptor,
274 }
275 }
276
277 func (f *FileMetaData) Serialize(ctx context.Context) ([]byte, error) {
278 return thrift.NewThriftSerializer().Write(ctx, f.FileMetaData)
279 }
280
281 func (f *FileMetaData) SerializeString(ctx context.Context) (string, error) {
282 return thrift.NewThriftSerializer().WriteString(ctx, f.FileMetaData)
283 }
284
285
286
287 func (f *FileMetaData) EncryptionAlgorithm() parquet.Algorithm {
288 if f.IsSetEncryptionAlgorithm() {
289 return parquet.AlgorithmFromThrift(f.GetEncryptionAlgorithm())
290 }
291 return parquet.Algorithm{}
292 }
293
294 func (f *FileMetaData) initSchema() error {
295 root, err := schema.FromParquet(f.FileMetaData.Schema)
296 if err != nil {
297 return err
298 }
299 f.Schema = schema.NewSchema(root.(*schema.GroupNode))
300 return nil
301 }
302
303 func (f *FileMetaData) initColumnOrders() {
304 orders := make([]parquet.ColumnOrder, 0, f.Schema.NumColumns())
305 if f.IsSetColumnOrders() {
306 for _, o := range f.GetColumnOrders() {
307 if o.IsSetTYPE_ORDER() {
308 orders = append(orders, parquet.ColumnOrders.TypeDefinedOrder)
309 } else {
310 orders = append(orders, parquet.ColumnOrders.Undefined)
311 }
312 }
313 } else {
314 orders = orders[:f.Schema.NumColumns()]
315 orders[0] = parquet.ColumnOrders.Undefined
316 for i := 1; i < len(orders); i *= 2 {
317 copy(orders[i:], orders[:i])
318 }
319 }
320 f.Schema.UpdateColumnOrders(orders)
321 }
322
323
324
325 func (f *FileMetaData) WriterVersion() *AppVersion {
326 if f.version == nil {
327 f.version = NewAppVersion(f.GetCreatedBy())
328 }
329 return f.version
330 }
331
332
333 func (f *FileMetaData) SetFilePath(path string) {
334 for _, rg := range f.RowGroups {
335 for _, chunk := range rg.Columns {
336 chunk.FilePath = &path
337 }
338 }
339 }
340
341
342
343 func (f *FileMetaData) AppendRowGroups(other *FileMetaData) error {
344 if !f.Schema.Equals(other.Schema) {
345 return xerrors.New("parquet/FileMetaData: AppendRowGroups requires equal schemas")
346 }
347
348 f.RowGroups = append(f.RowGroups, other.GetRowGroups()...)
349 for _, rg := range other.GetRowGroups() {
350 f.NumRows += rg.NumRows
351 }
352 return nil
353 }
354
355
356
357 func (f *FileMetaData) Subset(rowGroups []int) (*FileMetaData, error) {
358 for _, i := range rowGroups {
359 if i < len(f.RowGroups) {
360 continue
361 }
362 return nil, fmt.Errorf("parquet: this file only has %d row groups, but requested a subset including row group: %d", len(f.RowGroups), i)
363 }
364
365 out := &FileMetaData{
366 &format.FileMetaData{
367 Schema: f.FileMetaData.Schema,
368 CreatedBy: f.CreatedBy,
369 ColumnOrders: f.GetColumnOrders(),
370 EncryptionAlgorithm: f.FileMetaData.EncryptionAlgorithm,
371 FooterSigningKeyMetadata: f.FooterSigningKeyMetadata,
372 Version: f.FileMetaData.Version,
373 KeyValueMetadata: f.KeyValueMetadata(),
374 },
375 f.Schema,
376 f.FileDecryptor,
377 f.version,
378 0,
379 }
380
381 out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups))
382 for _, selected := range rowGroups {
383 out.RowGroups = append(out.RowGroups, f.RowGroups[selected])
384 out.NumRows += f.RowGroups[selected].GetNumRows()
385 }
386
387 return out, nil
388 }
389
390 func (f *FileMetaData) Equals(other *FileMetaData) bool {
391 return reflect.DeepEqual(f.FileMetaData, other.FileMetaData)
392 }
393
394 func (f *FileMetaData) KeyValueMetadata() KeyValueMetadata {
395 return f.GetKeyValueMetadata()
396 }
397
398
399
400
401
402 func (f *FileMetaData) VerifySignature(signature []byte) bool {
403 if f.FileDecryptor == nil {
404 panic("decryption not set properly, cannot verify signature")
405 }
406
407 serializer := thrift.NewThriftSerializer()
408 data, _ := serializer.Write(context.Background(), f.FileMetaData)
409 nonce := signature[:encryption.NonceLength]
410 tag := signature[encryption.NonceLength : encryption.NonceLength+encryption.GcmTagLength]
411
412 key := f.FileDecryptor.GetFooterKey()
413 aad := encryption.CreateFooterAad(f.FileDecryptor.FileAad())
414
415 enc := encryption.NewAesEncryptor(f.FileDecryptor.Algorithm(), true)
416 var buf bytes.Buffer
417 buf.Grow(enc.CiphertextSizeDelta() + len(data))
418 encryptedLen := enc.SignedFooterEncrypt(&buf, data, []byte(key), []byte(aad), nonce)
419 return bytes.Equal(buf.Bytes()[encryptedLen-encryption.GcmTagLength:], tag)
420 }
421
422
423
424
425
426
427 func (f *FileMetaData) WriteTo(w io.Writer, encryptor encryption.Encryptor) (int64, error) {
428 serializer := thrift.NewThriftSerializer()
429
430 if f.IsSetEncryptionAlgorithm() {
431 data, err := serializer.Write(context.Background(), f.FileMetaData)
432 if err != nil {
433 return 0, err
434 }
435
436
437 var buf bytes.Buffer
438 buf.Grow(encryptor.CiphertextSizeDelta() + len(data))
439 encryptedLen := encryptor.Encrypt(&buf, data)
440
441 wrote := 0
442 n := 0
443
444 if n, err = w.Write(data); err != nil {
445 return int64(n), err
446 }
447 wrote += n
448
449 buf.Next(4)
450 if n, err = w.Write(buf.Next(encryption.NonceLength)); err != nil {
451 return int64(wrote + n), err
452 }
453 wrote += n
454 buf.Next(encryptedLen - 4 - encryption.NonceLength - encryption.GcmTagLength)
455 n, err = w.Write(buf.Next(encryption.GcmTagLength))
456 return int64(wrote + n), err
457 }
458 n, err := serializer.Serialize(f.FileMetaData, w, encryptor)
459 return int64(n), err
460 }
461
462
463
464
465
466
467
468 func (f *FileMetaData) Version() parquet.Version {
469 switch f.FileMetaData.Version {
470 case 1:
471 return parquet.V1_0
472 case 2:
473 return parquet.V2_LATEST
474 default:
475
476 return parquet.V1_0
477 }
478 }
479
480
481 type FileCryptoMetadata struct {
482 metadata *format.FileCryptoMetaData
483 cryptoMetadataLen uint32
484 }
485
486
487
488 func NewFileCryptoMetaData(metadata []byte) (ret FileCryptoMetadata, err error) {
489 ret.metadata = format.NewFileCryptoMetaData()
490 var remain uint64
491 remain, err = thrift.DeserializeThrift(ret.metadata, metadata)
492 ret.cryptoMetadataLen = uint32(uint64(len(metadata)) - remain)
493 return
494 }
495
496
497 func (fc FileCryptoMetadata) WriteTo(w io.Writer) (int64, error) {
498 serializer := thrift.NewThriftSerializer()
499 n, err := serializer.Serialize(fc.metadata, w, nil)
500 return int64(n), err
501 }
502
503
504 func (fc FileCryptoMetadata) Len() int { return int(fc.cryptoMetadataLen) }
505
506 func (fc FileCryptoMetadata) KeyMetadata() []byte {
507 return fc.metadata.KeyMetadata
508 }
509
510
511
512 func (fc FileCryptoMetadata) EncryptionAlgorithm() parquet.Algorithm {
513 return parquet.AlgorithmFromThrift(fc.metadata.GetEncryptionAlgorithm())
514 }
515
View as plain text