1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package parquet
18
19 import (
20 "crypto/rand"
21 "unicode/utf8"
22
23 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
24 )
25
26
27 const (
28
29 DefaultEncryptionAlgorithm = AesGcm
30 MaximalAadMetadataLength int32 = 256
31
32 DefaultEncryptedFooter = true
33 DefaultCheckSignature = true
34
35
36 DefaultAllowPlaintextFiles = false
37 AadFileUniqueLength int32 = 8
38 )
39
40
41 type ColumnPathToDecryptionPropsMap map[string]*ColumnDecryptionProperties
42
43
44 type ColumnPathToEncryptionPropsMap map[string]*ColumnEncryptionProperties
45
46
47 type ColumnEncryptionProperties struct {
48 columnPath string
49 encrypted bool
50 encryptedWithFooterKey bool
51 key string
52 keyMetadata string
53 utilized bool
54 }
55
56
57 func (ce *ColumnEncryptionProperties) ColumnPath() string {
58 return ce.columnPath
59 }
60
61
62 func (ce *ColumnEncryptionProperties) IsEncrypted() bool { return ce.encrypted }
63
64
65
66 func (ce *ColumnEncryptionProperties) IsEncryptedWithFooterKey() bool {
67 return ce.encryptedWithFooterKey
68 }
69
70
71 func (ce *ColumnEncryptionProperties) Key() string { return ce.key }
72
73
74
75 func (ce *ColumnEncryptionProperties) KeyMetadata() string { return ce.keyMetadata }
76
77
78 func (ce *ColumnEncryptionProperties) WipeOutEncryptionKey() { ce.key = "" }
79
80
81
82 func (ce *ColumnEncryptionProperties) IsUtilized() bool {
83 if ce.key == "" {
84 return false
85 }
86 return ce.utilized
87 }
88
89
90
91 func (ce *ColumnEncryptionProperties) SetUtilized() {
92 ce.utilized = true
93 }
94
95
96 func (ce *ColumnEncryptionProperties) Clone() *ColumnEncryptionProperties {
97 copy := ce.key
98 return NewColumnEncryptionProperties(ce.columnPath, WithKey(copy), WithKeyMetadata(ce.keyMetadata))
99 }
100
101 type colEncryptConfig struct {
102 key string
103 keyMetadata string
104 encrypted bool
105 }
106
107
108 type ColumnEncryptOption func(*colEncryptConfig)
109
110
111
112
113
114
115 func WithKey(key string) ColumnEncryptOption {
116 return func(c *colEncryptConfig) {
117 if key != "" {
118 c.key = key
119 }
120 }
121 }
122
123
124 func WithKeyMetadata(keyMeta string) ColumnEncryptOption {
125 return func(c *colEncryptConfig) {
126 c.keyMetadata = keyMeta
127 }
128 }
129
130
131
132
133 func WithKeyID(keyID string) ColumnEncryptOption {
134 if !utf8.ValidString(keyID) {
135 panic("parquet: key id should be UTF8 encoded")
136 }
137 return WithKeyMetadata(keyID)
138 }
139
140
141 func NewColumnEncryptionProperties(name string, opts ...ColumnEncryptOption) *ColumnEncryptionProperties {
142 var cfg colEncryptConfig
143 cfg.encrypted = true
144 for _, o := range opts {
145 o(&cfg)
146 }
147 return &ColumnEncryptionProperties{
148 utilized: false,
149 encrypted: cfg.encrypted,
150 encryptedWithFooterKey: cfg.encrypted && cfg.key == "",
151 keyMetadata: cfg.keyMetadata,
152 key: cfg.key,
153 columnPath: name,
154 }
155 }
156
157
158 type ColumnDecryptionProperties struct {
159 columnPath string
160 key string
161 utilized bool
162 }
163
164
165
166 func NewColumnDecryptionProperties(column string, opts ...ColumnDecryptOption) *ColumnDecryptionProperties {
167 var cfg columnDecryptConfig
168 for _, o := range opts {
169 o(&cfg)
170 }
171
172 return &ColumnDecryptionProperties{
173 columnPath: column,
174 utilized: false,
175 key: cfg.key,
176 }
177 }
178
179
180 func (cd *ColumnDecryptionProperties) ColumnPath() string { return cd.columnPath }
181
182
183 func (cd *ColumnDecryptionProperties) Key() string { return cd.key }
184
185
186 func (cd *ColumnDecryptionProperties) IsUtilized() bool { return cd.utilized }
187
188
189
190 func (cd *ColumnDecryptionProperties) SetUtilized() { cd.utilized = true }
191
192
193 func (cd *ColumnDecryptionProperties) WipeOutDecryptionKey() { cd.key = "" }
194
195
196 func (cd *ColumnDecryptionProperties) Clone() *ColumnDecryptionProperties {
197 return NewColumnDecryptionProperties(cd.columnPath, WithDecryptKey(cd.key))
198 }
199
200 type columnDecryptConfig struct {
201 key string
202 }
203
204
205 type ColumnDecryptOption func(*columnDecryptConfig)
206
207
208 func WithDecryptKey(key string) ColumnDecryptOption {
209 return func(cfg *columnDecryptConfig) {
210 if key != "" {
211 cfg.key = key
212 }
213 }
214 }
215
216
217
218
219
220 type AADPrefixVerifier interface {
221
222 Verify(string)
223 }
224
225
226
227 type DecryptionKeyRetriever interface {
228 GetKey(keyMetadata []byte) string
229 }
230
231
232
233 type FileDecryptionProperties struct {
234 footerKey string
235 aadPrefix string
236 checkPlaintextFooterIntegrity bool
237 plaintextAllowed bool
238 utilized bool
239 columnDecryptProps ColumnPathToDecryptionPropsMap
240 Verifier AADPrefixVerifier
241 KeyRetriever DecryptionKeyRetriever
242 }
243
244
245
246
247 func NewFileDecryptionProperties(opts ...FileDecryptionOption) *FileDecryptionProperties {
248 var cfg fileDecryptConfig
249 cfg.checkFooterIntegrity = DefaultCheckSignature
250 cfg.plaintextAllowed = DefaultAllowPlaintextFiles
251 for _, o := range opts {
252 o(&cfg)
253 }
254 return &FileDecryptionProperties{
255 Verifier: cfg.verifier,
256 footerKey: cfg.footerKey,
257 checkPlaintextFooterIntegrity: cfg.checkFooterIntegrity,
258 KeyRetriever: cfg.retriever,
259 aadPrefix: cfg.aadPrefix,
260 columnDecryptProps: cfg.colDecrypt,
261 plaintextAllowed: cfg.plaintextAllowed,
262 utilized: false,
263 }
264 }
265
266
267 func (fd *FileDecryptionProperties) ColumnKey(path string) string {
268 if d, ok := fd.columnDecryptProps[path]; ok {
269 if d != nil {
270 return d.Key()
271 }
272 }
273 return ""
274 }
275
276
277
278 func (fd *FileDecryptionProperties) FooterKey() string { return fd.footerKey }
279
280
281 func (fd *FileDecryptionProperties) AadPrefix() string { return fd.aadPrefix }
282
283
284
285 func (fd *FileDecryptionProperties) PlaintextFooterIntegrity() bool {
286 return fd.checkPlaintextFooterIntegrity
287 }
288
289
290 func (fd *FileDecryptionProperties) PlaintextFilesAllowed() bool { return fd.plaintextAllowed }
291
292
293
294 func (fd *FileDecryptionProperties) SetUtilized() { fd.utilized = true }
295
296
297
298 func (fd *FileDecryptionProperties) IsUtilized() bool {
299 if fd.footerKey == "" && len(fd.columnDecryptProps) == 0 && fd.aadPrefix == "" {
300 return false
301 }
302 return fd.utilized
303 }
304
305
306
307 func (fd *FileDecryptionProperties) WipeOutDecryptionKeys() {
308 fd.footerKey = ""
309 for _, cd := range fd.columnDecryptProps {
310 cd.WipeOutDecryptionKey()
311 }
312 }
313
314
315 func (fd *FileDecryptionProperties) Clone(newAadPrefix string) *FileDecryptionProperties {
316 keyCopy := fd.footerKey
317 colDecryptMapCopy := make(ColumnPathToDecryptionPropsMap)
318 for k, v := range fd.columnDecryptProps {
319 colDecryptMapCopy[k] = v.Clone()
320 }
321 if newAadPrefix == "" {
322 newAadPrefix = fd.aadPrefix
323 }
324 return &FileDecryptionProperties{
325 footerKey: keyCopy,
326 KeyRetriever: fd.KeyRetriever,
327 checkPlaintextFooterIntegrity: fd.checkPlaintextFooterIntegrity,
328 Verifier: fd.Verifier,
329 columnDecryptProps: colDecryptMapCopy,
330 aadPrefix: newAadPrefix,
331 plaintextAllowed: fd.plaintextAllowed,
332 utilized: false,
333 }
334 }
335
336 type fileDecryptConfig struct {
337 footerKey string
338 aadPrefix string
339 verifier AADPrefixVerifier
340 colDecrypt ColumnPathToDecryptionPropsMap
341 retriever DecryptionKeyRetriever
342 checkFooterIntegrity bool
343 plaintextAllowed bool
344 }
345
346
347 type FileDecryptionOption func(*fileDecryptConfig)
348
349
350
351
352
353
354
355
356
357
358
359
360 func WithFooterKey(key string) FileDecryptionOption {
361 return func(cfg *fileDecryptConfig) {
362 if key != "" {
363 cfg.footerKey = key
364 }
365 }
366 }
367
368
369 func WithPrefixVerifier(verifier AADPrefixVerifier) FileDecryptionOption {
370 return func(cfg *fileDecryptConfig) {
371 if verifier != nil {
372 cfg.verifier = verifier
373 }
374 }
375 }
376
377
378
379
380
381
382
383
384
385 func WithColumnKeys(decrypt ColumnPathToDecryptionPropsMap) FileDecryptionOption {
386 return func(cfg *fileDecryptConfig) {
387 if len(decrypt) == 0 {
388 return
389 }
390 if len(cfg.colDecrypt) != 0 {
391 panic("column properties already set")
392 }
393 for _, v := range decrypt {
394 if v.IsUtilized() {
395 panic("parquet: column properties utilized in another file")
396 }
397 v.SetUtilized()
398 }
399 cfg.colDecrypt = decrypt
400 }
401 }
402
403
404 func WithKeyRetriever(retriever DecryptionKeyRetriever) FileDecryptionOption {
405 return func(cfg *fileDecryptConfig) {
406 if retriever != nil {
407 cfg.retriever = retriever
408 }
409 }
410 }
411
412
413
414
415
416
417 func DisableFooterSignatureVerification() FileDecryptionOption {
418 return func(cfg *fileDecryptConfig) {
419 cfg.checkFooterIntegrity = false
420 }
421 }
422
423
424
425
426
427
428
429
430 func WithPlaintextAllowed() FileDecryptionOption {
431 return func(cfg *fileDecryptConfig) {
432 cfg.plaintextAllowed = true
433 }
434 }
435
436
437
438
439 func WithDecryptAadPrefix(prefix string) FileDecryptionOption {
440 return func(cfg *fileDecryptConfig) {
441 if prefix != "" {
442 cfg.aadPrefix = prefix
443 }
444 }
445 }
446
447
448
449 type Algorithm struct {
450 Algo Cipher
451 Aad struct {
452 AadPrefix []byte
453 AadFileUnique []byte
454 SupplyAadPrefix bool
455 }
456 }
457
458
459 func (e Algorithm) ToThrift() *format.EncryptionAlgorithm {
460 if e.Algo == AesGcm {
461 return &format.EncryptionAlgorithm{
462 AES_GCM_V1: &format.AesGcmV1{
463 AadPrefix: e.Aad.AadPrefix,
464 AadFileUnique: e.Aad.AadFileUnique,
465 SupplyAadPrefix: &e.Aad.SupplyAadPrefix,
466 },
467 }
468 }
469 return &format.EncryptionAlgorithm{
470 AES_GCM_CTR_V1: &format.AesGcmCtrV1{
471 AadPrefix: e.Aad.AadPrefix,
472 AadFileUnique: e.Aad.AadFileUnique,
473 SupplyAadPrefix: &e.Aad.SupplyAadPrefix,
474 },
475 }
476 }
477
478
479 func AlgorithmFromThrift(enc *format.EncryptionAlgorithm) (ret Algorithm) {
480 if enc.IsSetAES_GCM_V1() {
481 ret.Algo = AesGcm
482 ret.Aad.AadFileUnique = enc.AES_GCM_V1.AadFileUnique
483 ret.Aad.AadPrefix = enc.AES_GCM_V1.AadPrefix
484 ret.Aad.SupplyAadPrefix = *enc.AES_GCM_V1.SupplyAadPrefix
485 return
486 }
487 ret.Algo = AesCtr
488 ret.Aad.AadFileUnique = enc.AES_GCM_CTR_V1.AadFileUnique
489 ret.Aad.AadPrefix = enc.AES_GCM_CTR_V1.AadPrefix
490 ret.Aad.SupplyAadPrefix = *enc.AES_GCM_CTR_V1.SupplyAadPrefix
491 return
492 }
493
494
495 type FileEncryptionProperties struct {
496 alg Algorithm
497 footerKey string
498 footerKeyMetadata string
499 encryptedFooter bool
500 fileAad string
501 utilized bool
502 storeAadPrefixInFile bool
503 aadPrefix string
504 encryptedCols ColumnPathToEncryptionPropsMap
505 }
506
507
508 func (fe *FileEncryptionProperties) EncryptedFooter() bool { return fe.encryptedFooter }
509
510
511 func (fe *FileEncryptionProperties) Algorithm() Algorithm { return fe.alg }
512
513
514
515 func (fe *FileEncryptionProperties) FooterKey() string { return fe.footerKey }
516
517
518 func (fe *FileEncryptionProperties) FooterKeyMetadata() string { return fe.footerKeyMetadata }
519
520
521
522 func (fe *FileEncryptionProperties) FileAad() string { return fe.fileAad }
523
524
525 func (fe *FileEncryptionProperties) IsUtilized() bool { return fe.utilized }
526
527
528
529 func (fe *FileEncryptionProperties) SetUtilized() { fe.utilized = true }
530
531
532 func (fe *FileEncryptionProperties) EncryptedColumns() ColumnPathToEncryptionPropsMap {
533 return fe.encryptedCols
534 }
535
536
537
538
539 func (fe *FileEncryptionProperties) ColumnEncryptionProperties(path string) *ColumnEncryptionProperties {
540 if len(fe.encryptedCols) == 0 {
541 return NewColumnEncryptionProperties(path)
542 }
543 if c, ok := fe.encryptedCols[path]; ok {
544 return c
545 }
546 return nil
547 }
548
549
550
551
552 func (fe *FileEncryptionProperties) Clone(newAadPrefix string) *FileEncryptionProperties {
553 footerKeyCopy := fe.footerKey
554 encryptedColsCopy := make(ColumnPathToEncryptionPropsMap)
555 for k, v := range fe.encryptedCols {
556 encryptedColsCopy[k] = v.Clone()
557 }
558 if newAadPrefix == "" {
559 newAadPrefix = fe.aadPrefix
560 }
561
562 opts := []EncryptOption{
563 WithAlg(fe.alg.Algo), WithFooterKeyMetadata(fe.footerKeyMetadata),
564 WithAadPrefix(newAadPrefix), WithEncryptedColumns(encryptedColsCopy),
565 }
566 if !fe.encryptedFooter {
567 opts = append(opts, WithPlaintextFooter())
568 }
569 if !fe.storeAadPrefixInFile {
570 opts = append(opts, DisableAadPrefixStorage())
571 }
572 return NewFileEncryptionProperties(footerKeyCopy, opts...)
573 }
574
575
576 func (fe *FileEncryptionProperties) WipeOutEncryptionKeys() {
577 fe.footerKey = ""
578 for _, elem := range fe.encryptedCols {
579 elem.WipeOutEncryptionKey()
580 }
581 }
582
583 type configEncrypt struct {
584 cipher Cipher
585 encryptFooter bool
586 keyMetadata string
587 aadprefix string
588 storeAadPrefixInFile bool
589 encryptedCols ColumnPathToEncryptionPropsMap
590 }
591
592
593 type EncryptOption func(*configEncrypt)
594
595
596
597 func WithPlaintextFooter() EncryptOption {
598 return func(cfg *configEncrypt) {
599 cfg.encryptFooter = false
600 }
601 }
602
603
604 func WithAlg(cipher Cipher) EncryptOption {
605 return func(cfg *configEncrypt) {
606 cfg.cipher = cipher
607 }
608 }
609
610
611
612
613 func WithFooterKeyID(key string) EncryptOption {
614 if !utf8.ValidString(key) {
615 panic("parquet: footer key id should be UTF8 encoded")
616 }
617 return WithFooterKeyMetadata(key)
618 }
619
620
621
622
623 func WithFooterKeyMetadata(keyMeta string) EncryptOption {
624 return func(cfg *configEncrypt) {
625 if keyMeta != "" {
626 cfg.keyMetadata = keyMeta
627 }
628 }
629 }
630
631
632 func WithAadPrefix(aadPrefix string) EncryptOption {
633 return func(cfg *configEncrypt) {
634 if aadPrefix != "" {
635 cfg.aadprefix = aadPrefix
636 cfg.storeAadPrefixInFile = true
637 }
638 }
639 }
640
641
642
643 func DisableAadPrefixStorage() EncryptOption {
644 return func(cfg *configEncrypt) {
645 cfg.storeAadPrefixInFile = false
646 }
647 }
648
649
650
651 func WithEncryptedColumns(encrypted ColumnPathToEncryptionPropsMap) EncryptOption {
652 none := func(*configEncrypt) {}
653 if len(encrypted) == 0 {
654 return none
655 }
656 return func(cfg *configEncrypt) {
657 if len(cfg.encryptedCols) != 0 {
658 panic("column properties already set")
659 }
660 for _, v := range encrypted {
661 if v.IsUtilized() {
662 panic("column properties utilized in another file")
663 }
664 v.SetUtilized()
665 }
666 cfg.encryptedCols = encrypted
667 }
668 }
669
670
671 func NewFileEncryptionProperties(footerKey string, opts ...EncryptOption) *FileEncryptionProperties {
672 var cfg configEncrypt
673 cfg.cipher = DefaultEncryptionAlgorithm
674 cfg.encryptFooter = DefaultEncryptedFooter
675 for _, o := range opts {
676 o(&cfg)
677 }
678
679 props := &FileEncryptionProperties{
680 footerKey: footerKey,
681 footerKeyMetadata: cfg.keyMetadata,
682 encryptedFooter: cfg.encryptFooter,
683 aadPrefix: cfg.aadprefix,
684 storeAadPrefixInFile: cfg.storeAadPrefixInFile,
685 encryptedCols: cfg.encryptedCols,
686 utilized: false,
687 }
688
689 aadFileUnique := [AadFileUniqueLength]uint8{}
690 _, err := rand.Read(aadFileUnique[:])
691 if err != nil {
692 panic(err)
693 }
694
695 supplyAadPrefix := false
696 if props.aadPrefix == "" {
697 props.fileAad = string(aadFileUnique[:])
698 } else {
699 props.fileAad = props.aadPrefix + string(aadFileUnique[:])
700 if !props.storeAadPrefixInFile {
701 supplyAadPrefix = true
702 }
703 }
704 props.alg.Algo = cfg.cipher
705 props.alg.Aad.AadFileUnique = aadFileUnique[:]
706 props.alg.Aad.SupplyAadPrefix = supplyAadPrefix
707 if cfg.aadprefix != "" && cfg.storeAadPrefixInFile {
708 props.alg.Aad.AadPrefix = []byte(props.aadPrefix)
709 }
710 return props
711 }
712
View as plain text