1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package parquet_test
18
19 import (
20 "encoding/binary"
21 "fmt"
22 "os"
23 "path"
24 "testing"
25
26 "github.com/apache/arrow/go/v15/arrow/memory"
27 "github.com/apache/arrow/go/v15/parquet"
28 "github.com/apache/arrow/go/v15/parquet/file"
29 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
30 "github.com/stretchr/testify/suite"
31 )
32
33
83
84 func getDataDir() string {
85 datadir := os.Getenv("PARQUET_TEST_DATA")
86 if datadir == "" {
87 panic("please point the PARQUET_TEST_DATA environment variable to the test data dir")
88 }
89 return datadir
90 }
91
92 type TestDecryptionSuite struct {
93 suite.Suite
94
95 pathToDouble string
96 pathToFloat string
97 decryptionConfigs []*parquet.FileDecryptionProperties
98 footerEncryptionKey string
99 colEncryptionKey1 string
100 colEncryptionKey2 string
101 fileName string
102 rowsPerRG int
103 }
104
105 func (d *TestDecryptionSuite) TearDownSuite() {
106 os.Remove(tempdir)
107 }
108
109 func TestFileEncryptionDecryption(t *testing.T) {
110 suite.Run(t, new(EncryptionConfigTestSuite))
111 suite.Run(t, new(TestDecryptionSuite))
112 }
113
114 func (d *TestDecryptionSuite) SetupSuite() {
115 d.pathToDouble = "double_field"
116 d.pathToFloat = "float_field"
117 d.footerEncryptionKey = FooterEncryptionKey
118 d.colEncryptionKey1 = ColumnEncryptionKey1
119 d.colEncryptionKey2 = ColumnEncryptionKey2
120 d.fileName = FileName
121 d.rowsPerRG = 50
122
123 d.createDecryptionConfigs()
124 }
125
126 func (d *TestDecryptionSuite) createDecryptionConfigs() {
127
128
129 stringKr1 := make(encryption.StringKeyIDRetriever)
130 stringKr1.PutKey("kf", d.footerEncryptionKey)
131 stringKr1.PutKey("kc1", d.colEncryptionKey1)
132 stringKr1.PutKey("kc2", d.colEncryptionKey2)
133
134 d.decryptionConfigs = append(d.decryptionConfigs,
135 parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr1)))
136
137
138
139 stringKr2 := make(encryption.StringKeyIDRetriever)
140 stringKr2.PutKey("kf", d.footerEncryptionKey)
141 stringKr2.PutKey("kc1", d.colEncryptionKey1)
142 stringKr2.PutKey("kc2", d.colEncryptionKey2)
143 d.decryptionConfigs = append(d.decryptionConfigs,
144 parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr2), parquet.WithDecryptAadPrefix(d.fileName)))
145
146
147
148 decryptCols := make(parquet.ColumnPathToDecryptionPropsMap)
149 decryptCols[d.pathToFloat] = parquet.NewColumnDecryptionProperties(d.pathToFloat, parquet.WithDecryptKey(d.colEncryptionKey2))
150 decryptCols[d.pathToDouble] = parquet.NewColumnDecryptionProperties(d.pathToDouble, parquet.WithDecryptKey(d.colEncryptionKey1))
151 d.decryptionConfigs = append(d.decryptionConfigs,
152 parquet.NewFileDecryptionProperties(parquet.WithFooterKey(d.footerEncryptionKey), parquet.WithColumnKeys(decryptCols)))
153
154
155
156 d.decryptionConfigs = append(d.decryptionConfigs, nil)
157 }
158
159 func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int) {
160
161
162 props := parquet.NewReaderProperties(memory.DefaultAllocator)
163 if decryptConfigNum != 3 {
164 props.FileDecryptProps = d.decryptionConfigs[decryptConfigNum].Clone("")
165 }
166
167 fileReader, err := file.OpenParquetFile(filename, false, file.WithReadProps(props))
168 if err != nil {
169 panic(err)
170 }
171 defer fileReader.Close()
172
173 fileMetadata := fileReader.MetaData()
174
175 numRowGroups := len(fileMetadata.RowGroups)
176
177 numColumns := fileMetadata.Schema.NumColumns()
178 d.Equal(8, numColumns)
179
180 for r := 0; r < numRowGroups; r++ {
181 rowGroupReader := fileReader.RowGroup(r)
182
183
184 rgMeta := fileMetadata.RowGroup(r)
185 d.EqualValues(d.rowsPerRG, rgMeta.NumRows())
186
187 valuesRead := 0
188 rowsRead := int64(0)
189
190
191 colReader, err := rowGroupReader.Column(0)
192 if err != nil {
193 panic(err)
194 }
195 boolReader := colReader.(*file.BooleanColumnChunkReader)
196
197
198 boolMd, _ := rgMeta.ColumnChunk(0)
199 d.EqualValues(d.rowsPerRG, boolMd.NumValues())
200
201
202 i := 0
203 for boolReader.HasNext() {
204 var val [1]bool
205
206
207 rowsRead, valuesRead, _ = boolReader.ReadBatch(1, val[:], nil, nil)
208
209 d.EqualValues(1, rowsRead)
210
211 d.EqualValues(1, valuesRead)
212
213 expected := i%2 == 0
214 d.Equal(expected, val[0], "i: ", i)
215 i++
216 }
217 d.EqualValues(i, boolMd.NumValues())
218
219
220 colReader, err = rowGroupReader.Column(1)
221 if err != nil {
222 panic(err)
223 }
224 int32reader := colReader.(*file.Int32ColumnChunkReader)
225
226 int32md, _ := rgMeta.ColumnChunk(1)
227 d.EqualValues(d.rowsPerRG, int32md.NumValues())
228
229 i = 0
230 for int32reader.HasNext() {
231 var val [1]int32
232
233
234 rowsRead, valuesRead, _ = int32reader.ReadBatch(1, val[:], nil, nil)
235
236 d.EqualValues(1, rowsRead)
237
238 d.EqualValues(1, valuesRead)
239
240 d.EqualValues(i, val[0])
241 i++
242 }
243 d.EqualValues(i, int32md.NumValues())
244
245
246 colReader, err = rowGroupReader.Column(2)
247 if err != nil {
248 panic(err)
249 }
250 int64reader := colReader.(*file.Int64ColumnChunkReader)
251
252 int64md, _ := rgMeta.ColumnChunk(2)
253
254 d.EqualValues(2*d.rowsPerRG, int64md.NumValues())
255
256 i = 0
257 for int64reader.HasNext() {
258 var (
259 val [1]int64
260 def [1]int16
261 rep [1]int16
262 )
263
264
265
266 rowsRead, valuesRead, _ = int64reader.ReadBatch(1, val[:], def[:], rep[:])
267
268 d.EqualValues(1, rowsRead)
269
270 d.EqualValues(1, valuesRead)
271
272 expectedValue := int64(i) * 1000 * 1000 * 1000 * 1000
273 d.Equal(expectedValue, val[0])
274 if i%2 == 0 {
275 d.EqualValues(1, rep[0])
276 } else {
277 d.Zero(rep[0])
278 }
279 i++
280 }
281 d.EqualValues(i, int64md.NumValues())
282
283
284 colReader, err = rowGroupReader.Column(3)
285 if err != nil {
286 panic(err)
287 }
288 int96reader := colReader.(*file.Int96ColumnChunkReader)
289
290 int96md, _ := rgMeta.ColumnChunk(3)
291
292 i = 0
293 for int96reader.HasNext() {
294 var (
295 val [1]parquet.Int96
296 )
297
298
299
300 rowsRead, valuesRead, _ = int96reader.ReadBatch(1, val[:], nil, nil)
301
302 d.EqualValues(1, rowsRead)
303
304 d.EqualValues(1, valuesRead)
305
306 var expectedValue parquet.Int96
307 binary.LittleEndian.PutUint32(expectedValue[:4], uint32(i))
308 binary.LittleEndian.PutUint32(expectedValue[4:], uint32(i+1))
309 binary.LittleEndian.PutUint32(expectedValue[8:], uint32(i+2))
310 d.Equal(expectedValue, val[0])
311 i++
312 }
313 d.EqualValues(i, int96md.NumValues())
314
315
316
317 if props.FileDecryptProps != nil {
318
319 colReader, err = rowGroupReader.Column(4)
320 if err != nil {
321 panic(err)
322 }
323 floatReader := colReader.(*file.Float32ColumnChunkReader)
324
325 floatmd, _ := rgMeta.ColumnChunk(4)
326
327 i = 0
328 for floatReader.HasNext() {
329 var value [1]float32
330
331
332 rowsRead, valuesRead, _ = floatReader.ReadBatch(1, value[:], nil, nil)
333
334 d.EqualValues(1, rowsRead)
335
336 d.EqualValues(1, valuesRead)
337
338 expectedValue := float32(i) * 1.1
339 d.Equal(expectedValue, value[0])
340 i++
341 }
342 d.EqualValues(i, floatmd.NumValues())
343
344
345 colReader, err = rowGroupReader.Column(5)
346 if err != nil {
347 panic(err)
348 }
349 dblReader := colReader.(*file.Float64ColumnChunkReader)
350
351 dblmd, _ := rgMeta.ColumnChunk(5)
352
353 i = 0
354 for dblReader.HasNext() {
355 var value [1]float64
356
357
358 rowsRead, valuesRead, _ = dblReader.ReadBatch(1, value[:], nil, nil)
359
360 d.EqualValues(1, rowsRead)
361
362 d.EqualValues(1, valuesRead)
363
364 expectedValue := float64(i) * 1.1111111
365 d.Equal(expectedValue, value[0])
366 i++
367 }
368 d.EqualValues(i, dblmd.NumValues())
369 }
370
371 colReader, err = rowGroupReader.Column(6)
372 if err != nil {
373 panic(err)
374 }
375 bareader := colReader.(*file.ByteArrayColumnChunkReader)
376
377 bamd, _ := rgMeta.ColumnChunk(6)
378
379 i = 0
380 for bareader.HasNext() {
381 var value [1]parquet.ByteArray
382 var def [1]int16
383
384 rowsRead, valuesRead, _ := bareader.ReadBatch(1, value[:], def[:], nil)
385 d.EqualValues(1, rowsRead)
386 expected := [10]byte{'p', 'a', 'r', 'q', 'u', 'e', 't', 0, 0, 0}
387 expected[7] = byte('0') + byte(i/100)
388 expected[8] = byte('0') + byte(i/10)%10
389 expected[9] = byte('0') + byte(i%10)
390 if i%2 == 0 {
391 d.Equal(1, valuesRead)
392 d.Len(value[0], 10)
393 d.EqualValues(expected[:], value[0])
394 d.EqualValues(1, def[0])
395 } else {
396 d.Zero(valuesRead)
397 d.Zero(def[0])
398 }
399 i++
400 }
401 d.EqualValues(i, bamd.NumValues())
402 }
403 }
404
405 func (d *TestDecryptionSuite) checkResults(fileName string, decryptionConfig, encryptionConfig uint) {
406 decFn := func() { d.decryptFile(fileName, int(decryptionConfig-1)) }
407
408
409
410 if encryptionConfig == 5 {
411 if decryptionConfig == 1 || decryptionConfig == 3 {
412 d.Panics(decFn)
413 return
414 }
415 }
416
417
418
419 if decryptionConfig == 2 {
420 if encryptionConfig != 5 && encryptionConfig != 4 {
421 d.Panics(decFn)
422 return
423 }
424 }
425
426
427 if decryptionConfig == 4 && encryptionConfig != 3 {
428 return
429 }
430 d.NotPanics(decFn)
431 }
432
433
434
435
436
437 func (d *TestDecryptionSuite) TestDecryption() {
438 tests := []struct {
439 file string
440 config uint
441 }{
442 {"uniform_encryption.parquet.encrypted", 1},
443 {"encrypt_columns_and_footer.parquet.encrypted", 2},
444 {"encrypt_columns_plaintext_footer.parquet.encrypted", 3},
445 {"encrypt_columns_and_footer_aad.parquet.encrypted", 4},
446 {"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5},
447 {"encrypt_columns_and_footer_ctr.parquet.encrypted", 6},
448 }
449 for _, tt := range tests {
450 d.Run(tt.file, func() {
451
452 tmpFile := path.Join(tempdir, "tmp_"+tt.file)
453 d.Require().FileExists(tmpFile)
454
455
456 for idx := range d.decryptionConfigs {
457 decConfig := idx + 1
458 d.checkResults(tmpFile, uint(decConfig), tt.config)
459 }
460 os.Remove(tmpFile)
461
462 file := path.Join(getDataDir(), tt.file)
463 d.Require().FileExists(file)
464
465 for idx := range d.decryptionConfigs {
466 decConfig := idx + 1
467 d.Run(fmt.Sprintf("config %d", decConfig), func() {
468 d.checkResults(file, uint(decConfig), tt.config)
469 })
470 }
471 })
472 }
473 }
474
View as plain text