1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package pqarrow_test
20
21 import (
22 "bytes"
23 "context"
24 "fmt"
25 "math"
26 "strings"
27 "testing"
28
29 "github.com/apache/arrow/go/v15/arrow"
30 "github.com/apache/arrow/go/v15/arrow/array"
31 "github.com/apache/arrow/go/v15/arrow/compute"
32 "github.com/apache/arrow/go/v15/arrow/memory"
33 "github.com/apache/arrow/go/v15/parquet"
34 "github.com/apache/arrow/go/v15/parquet/file"
35 "github.com/apache/arrow/go/v15/parquet/internal/testutils"
36 "github.com/apache/arrow/go/v15/parquet/pqarrow"
37 "github.com/stretchr/testify/assert"
38 "github.com/stretchr/testify/require"
39 "github.com/stretchr/testify/suite"
40 )
41
42 func (ps *ParquetIOTestSuite) TestSingleColumnOptionalDictionaryWrite() {
43 for _, dt := range fullTypeList {
44
45 if dt.ID() == arrow.BOOL {
46 continue
47 }
48
49 ps.Run(dt.Name(), func() {
50 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
51 defer mem.AssertSize(ps.T(), 0)
52
53 bldr := array.NewDictionaryBuilder(mem, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int16, ValueType: dt})
54 defer bldr.Release()
55
56 values := testutils.RandomNullable(dt, smallSize, 10)
57 defer values.Release()
58 ps.Require().NoError(bldr.AppendArray(values))
59
60 arr := bldr.NewDictionaryArray()
61 defer arr.Release()
62
63 sc := ps.makeSimpleSchema(arr.DataType(), parquet.Repetitions.Optional)
64 data := ps.writeColumn(mem, sc, arr)
65 ps.readAndCheckSingleColumnFile(mem, data, values)
66 })
67 }
68 }
69
70 func TestPqarrowDictionaries(t *testing.T) {
71 suite.Run(t, &ArrowWriteDictionarySuite{dataPageVersion: parquet.DataPageV1})
72 suite.Run(t, &ArrowWriteDictionarySuite{dataPageVersion: parquet.DataPageV2})
73 testSuite := &ArrowReadDictSuite{}
74 for _, np := range testSuite.NullProbabilities() {
75 testSuite.nullProb = np
76 t.Run(fmt.Sprintf("nullprob=%.2f", np), func(t *testing.T) {
77 suite.Run(t, testSuite)
78 })
79 }
80 }
81
82 type ArrowWriteDictionarySuite struct {
83 suite.Suite
84
85 dataPageVersion parquet.DataPageVersion
86 }
87
88 func (ad *ArrowWriteDictionarySuite) fromJSON(mem memory.Allocator, dt arrow.DataType, data string) arrow.Array {
89 arr, _, err := array.FromJSON(mem, dt, strings.NewReader(data))
90 ad.Require().NoError(err)
91 return arr
92 }
93
94 func (ad *ArrowWriteDictionarySuite) TestStatisticsWithFallback() {
95 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
96 defer mem.AssertSize(ad.T(), 0)
97
98 testDictionaries := []arrow.Array{
99 ad.fromJSON(mem, arrow.BinaryTypes.String, `["b", "c", "d", "a", "b", "c", "d", "a"]`),
100 ad.fromJSON(mem, arrow.BinaryTypes.String, `["b", "c", "d", "a", "b", "c", "d", "a"]`),
101 ad.fromJSON(mem, arrow.BinaryTypes.Binary, `["ZA==", "Yw==", "Yg==", "YQ==", "ZA==", "Yw==", "Yg==", "YQ=="]`),
102 ad.fromJSON(mem, arrow.BinaryTypes.LargeString, `["a", "b", "c", "a", "b", "c"]`),
103 }
104
105 testIndices := []arrow.Array{
106
107 ad.fromJSON(mem, arrow.PrimitiveTypes.Int32, `[0, null, 3, 0, null, 3]`),
108
109 ad.fromJSON(mem, arrow.PrimitiveTypes.Int32, `[0, 1, null, 0, 1, null]`),
110
111 ad.fromJSON(mem, arrow.PrimitiveTypes.Int32, `[0, 1, 3, 0, 1, 3]`),
112 ad.fromJSON(mem, arrow.PrimitiveTypes.Int32, `[null, null, null, null, null, null]`),
113 }
114
115 defer func() {
116 for _, d := range testDictionaries {
117 d.Release()
118 }
119 for _, i := range testIndices {
120 i.Release()
121 }
122 }()
123
124
125
126 expectedValidCounts := []int32{2, 2, 3, 0}
127 expectedNullCounts := []int32{1, 1, 0, 3}
128 expectedNumDataPages := []int{2, 2, 2, 1}
129 expectedValidByPage := [][]int32{
130 {1, 1},
131 {2, 0},
132 {2, 1},
133 {0}}
134 expectedNullByPage := [][]int64{
135 {1, 0},
136 {0, 1},
137 {0, 0},
138 {3}}
139 expectedDictCounts := []int32{4, 4, 4, 3}
140
141 expectedMinMax := [][2]string{
142 {"a", "b"},
143 {"b", "c"},
144 {"a", "d"},
145 {"", ""}}
146
147 expectedMinByPage := [][][]string{
148 {{"b", "a"}, {"b", "a"}},
149 {{"b", "b"}, {"b", "b"}},
150 {{"c", "a"}, {"c", "a"}}}
151 expectedMaxByPage := [][][]string{
152 {{"b", "a"}, {"b", "a"}},
153 {{"c", "c"}, {"c", "c"}},
154 {{"d", "a"}, {"d", "a"}}}
155 expectedHasMinMaxByPage := [][][]bool{
156 {{true, true}, {true, true}},
157
158
159 {{true, false}, {true, false}},
160 {{true, true}, {true, true}},
161 {{false}, {false}}}
162
163 for caseIndex, dict := range testDictionaries {
164 ad.Run(dict.DataType().String(), func() {
165 dictType := &arrow.DictionaryType{
166 IndexType: testIndices[caseIndex].DataType(),
167 ValueType: dict.DataType(),
168 }
169 dictEncoded := array.NewDictionaryArray(dictType, testIndices[caseIndex], dict)
170 defer dictEncoded.Release()
171 schema := arrow.NewSchema([]arrow.Field{
172 {Name: "values", Type: dictEncoded.DataType(), Nullable: true}}, nil)
173 col := arrow.NewColumnFromArr(schema.Field(0), dictEncoded)
174 defer col.Release()
175 tbl := array.NewTable(schema, []arrow.Column{col}, int64(dictEncoded.Len()))
176 defer tbl.Release()
177
178 writerProperties := parquet.NewWriterProperties(
179 parquet.WithMaxRowGroupLength(3),
180 parquet.WithDataPageVersion(ad.dataPageVersion),
181 parquet.WithBatchSize(2),
182 parquet.WithDictionaryDefault(true),
183 parquet.WithDataPageSize(2),
184 parquet.WithStats(true),
185 )
186
187 var buf bytes.Buffer
188 ad.Require().NoError(pqarrow.WriteTable(tbl, &buf, math.MaxInt64, writerProperties,
189 pqarrow.DefaultWriterProps()))
190
191 rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
192 ad.Require().NoError(err)
193 defer rdr.Close()
194
195 metadata := rdr.MetaData()
196 ad.Len(metadata.RowGroups, 2)
197
198 for i := 0; i < rdr.NumRowGroups(); i++ {
199 rg := metadata.RowGroup(i)
200 ad.EqualValues(1, rg.NumColumns())
201 col, err := rg.ColumnChunk(0)
202 ad.Require().NoError(err)
203 stats, err := col.Statistics()
204 ad.Require().NoError(err)
205
206 ad.EqualValues(expectedValidCounts[caseIndex], stats.NumValues())
207 ad.EqualValues(expectedNullCounts[caseIndex], stats.NullCount())
208
209 caseExpectedMinMax := expectedMinMax[caseIndex]
210 ad.Equal(caseExpectedMinMax[0], string(stats.EncodeMin()))
211 ad.Equal(caseExpectedMinMax[1], string(stats.EncodeMax()))
212 }
213
214 for rowGroup := 0; rowGroup < 2; rowGroup++ {
215 pr, err := rdr.RowGroup(0).GetColumnPageReader(0)
216 ad.Require().NoError(err)
217 ad.True(pr.Next())
218 page := pr.Page()
219 ad.NotNil(page)
220 ad.NoError(pr.Err())
221 ad.Require().IsType((*file.DictionaryPage)(nil), page)
222 dictPage := page.(*file.DictionaryPage)
223 ad.EqualValues(expectedDictCounts[caseIndex], dictPage.NumValues())
224
225 for pageIdx := 0; pageIdx < expectedNumDataPages[caseIndex]; pageIdx++ {
226 ad.True(pr.Next())
227 page = pr.Page()
228 ad.NotNil(page)
229 ad.NoError(pr.Err())
230
231 dataPage, ok := page.(file.DataPage)
232 ad.Require().True(ok)
233 stats := dataPage.Statistics()
234 ad.EqualValues(expectedNullByPage[caseIndex][pageIdx], stats.NullCount)
235
236 expectHasMinMax := expectedHasMinMaxByPage[caseIndex][rowGroup][pageIdx]
237 ad.Equal(expectHasMinMax, stats.HasMin)
238 ad.Equal(expectHasMinMax, stats.HasMax)
239
240 if expectHasMinMax {
241 ad.Equal(expectedMinByPage[caseIndex][rowGroup][pageIdx], string(stats.Min))
242 ad.Equal(expectedMaxByPage[caseIndex][rowGroup][pageIdx], string(stats.Max))
243 }
244
245 ad.EqualValues(expectedValidByPage[caseIndex][pageIdx]+int32(expectedNullByPage[caseIndex][pageIdx]),
246 dataPage.NumValues())
247 }
248
249 ad.False(pr.Next())
250 }
251 })
252 }
253 }
254
255 func (ad *ArrowWriteDictionarySuite) TestStatisticsUnifiedDictionary() {
256 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
257 defer mem.AssertSize(ad.T(), 0)
258
259
260 var (
261 tbl arrow.Table
262 dictType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32,
263 ValueType: arrow.BinaryTypes.String}
264 schema = arrow.NewSchema([]arrow.Field{
265 {Name: "values", Type: dictType, Nullable: true}}, nil)
266 )
267
268 {
269
270
271
272 testDictionary := ad.fromJSON(mem, arrow.BinaryTypes.String, `["b", "c", "d", "a"]`)
273 defer testDictionary.Release()
274
275 testIndices := []arrow.Array{
276
277 ad.fromJSON(mem, arrow.PrimitiveTypes.Int32, `[3, null, 3, 3, null, 3]`),
278
279 ad.fromJSON(mem, arrow.PrimitiveTypes.Int32, `[0, 3, null, 0, null, 1]`),
280 }
281 chunks := []arrow.Array{
282 array.NewDictionaryArray(dictType, testIndices[0], testDictionary),
283 array.NewDictionaryArray(dictType, testIndices[1], testDictionary),
284 }
285 testIndices[0].Release()
286 testIndices[1].Release()
287
288 tbl = array.NewTableFromSlice(schema, [][]arrow.Array{chunks})
289 defer tbl.Release()
290
291 chunks[0].Release()
292 chunks[1].Release()
293 }
294
295 var buf bytes.Buffer
296 {
297
298 props := parquet.NewWriterProperties(
299 parquet.WithMaxRowGroupLength(9),
300 parquet.WithDataPageVersion(ad.dataPageVersion),
301 parquet.WithBatchSize(3),
302 parquet.WithDataPageSize(3),
303 parquet.WithDictionaryDefault(true),
304 parquet.WithStats(true))
305
306 ad.Require().NoError(pqarrow.WriteTable(tbl, &buf, math.MaxInt64, props, pqarrow.DefaultWriterProps()))
307 }
308
309 rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
310 ad.Require().NoError(err)
311 defer rdr.Close()
312
313 metadata := rdr.MetaData()
314 ad.Len(metadata.RowGroups, 2)
315 ad.EqualValues(9, metadata.RowGroup(0).NumRows())
316 ad.EqualValues(3, metadata.RowGroup(1).NumRows())
317
318 col0, err := metadata.RowGroup(0).ColumnChunk(0)
319 ad.Require().NoError(err)
320 col1, err := metadata.RowGroup(1).ColumnChunk(0)
321 ad.Require().NoError(err)
322
323 stats0, err := col0.Statistics()
324 ad.Require().NoError(err)
325 stats1, err := col1.Statistics()
326 ad.Require().NoError(err)
327
328 ad.EqualValues(6, stats0.NumValues())
329 ad.EqualValues(2, stats1.NumValues())
330 ad.EqualValues(3, stats0.NullCount())
331 ad.EqualValues(1, stats1.NullCount())
332 ad.Equal([]byte("a"), stats0.EncodeMin())
333 ad.Equal([]byte("b"), stats1.EncodeMin())
334 ad.Equal([]byte("b"), stats0.EncodeMax())
335 ad.Equal([]byte("c"), stats1.EncodeMax())
336 }
337
338 const numRowGroups = 16
339
340 type ArrowReadDictSuite struct {
341 suite.Suite
342
343 mem *memory.CheckedAllocator
344
345 denseVals arrow.Array
346 expectedDense arrow.Table
347 props pqarrow.ArrowReadProperties
348 nullProb float64
349
350 buf bytes.Buffer
351
352 options struct {
353 numRows int
354 numRowGroups int
355 numUniques int
356 }
357 }
358
359 func (ar *ArrowReadDictSuite) generateData(nullProb float64) {
360 const minLen = 2
361 const maxLen = 100
362 rag := testutils.NewRandomArrayGenerator(0)
363
364 ar.denseVals = rag.StringWithRepeats(ar.mem, int64(ar.options.numRows),
365 int64(ar.options.numUniques), minLen, maxLen, nullProb)
366
367 chunked := arrow.NewChunked(arrow.BinaryTypes.String, []arrow.Array{ar.denseVals})
368 defer chunked.Release()
369 ar.expectedDense = makeSimpleTable(chunked, true)
370 }
371
372 func (ar *ArrowReadDictSuite) SetupTest() {
373 ar.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
374 ar.buf.Reset()
375
376 ar.options = struct {
377 numRows int
378 numRowGroups int
379 numUniques int
380 }{1024 * numRowGroups, numRowGroups, 128}
381
382 ar.props = pqarrow.ArrowReadProperties{}
383 ar.generateData(ar.nullProb)
384 }
385
386 func (ar *ArrowReadDictSuite) TearDownTest() {
387 if ar.denseVals != nil {
388 ar.denseVals.Release()
389 }
390 ar.expectedDense.Release()
391
392 ar.mem.AssertSize(ar.T(), 0)
393 }
394
395 func (ar *ArrowReadDictSuite) writeSimple() {
396
397
398 ar.Require().NoError(pqarrow.WriteTable(ar.expectedDense, &ar.buf, int64(ar.options.numRows/ar.options.numRowGroups),
399 parquet.NewWriterProperties(parquet.WithDictionaryDefault(true), parquet.WithStats(true)),
400 pqarrow.DefaultWriterProps()))
401 }
402
403 func (*ArrowReadDictSuite) NullProbabilities() []float64 {
404 return []float64{0.0, 0.5, 1}
405 }
406
407 func (ar *ArrowReadDictSuite) checkReadWholeFile(expected arrow.Table) {
408 tbl, err := pqarrow.ReadTable(context.Background(),
409 bytes.NewReader(ar.buf.Bytes()), nil, ar.props, ar.mem)
410 ar.Require().NoError(err)
411 defer tbl.Release()
412
413 ar.Truef(array.TableEqual(expected, tbl), "expected: %s\ngot: %s", expected, tbl)
414 }
415
416 func (ar *ArrowReadDictSuite) checkStreamReadWholeFile(expected arrow.Table) {
417 reader, err := file.NewParquetReader(bytes.NewReader(ar.buf.Bytes()))
418 ar.Require().NoError(err)
419 defer reader.Close()
420
421 rdr, err := pqarrow.NewFileReader(reader, ar.props, ar.mem)
422 ar.Require().NoError(err)
423
424 rrdr, err := rdr.GetRecordReader(context.Background(), nil, nil)
425 ar.Require().NoError(err)
426 defer rrdr.Release()
427
428 recs := make([]arrow.Record, 0)
429 for rrdr.Next() {
430 rec := rrdr.Record()
431 rec.Retain()
432 defer rec.Release()
433 recs = append(recs, rec)
434 }
435
436 tbl := array.NewTableFromRecords(rrdr.Schema(), recs)
437 defer tbl.Release()
438
439 ar.Truef(array.TableEqual(expected, tbl), "expected: %s\ngot: %s", expected, tbl)
440 }
441
442 func (ar *ArrowReadDictSuite) getReader() *pqarrow.FileReader {
443 reader, err := file.NewParquetReader(bytes.NewReader(ar.buf.Bytes()))
444 ar.Require().NoError(err)
445
446 rdr, err := pqarrow.NewFileReader(reader, ar.props, ar.mem)
447 ar.Require().NoError(err)
448 return rdr
449 }
450
451 func asDict32Encoded(mem memory.Allocator, arr arrow.Array) arrow.Array {
452 bldr := array.NewDictionaryBuilder(mem, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.String})
453 defer bldr.Release()
454 bldr.AppendArray(arr)
455 return bldr.NewArray()
456 }
457
458 func (ar *ArrowReadDictSuite) TestReadWholeFileDict() {
459 ar.props.SetReadDict(0, true)
460 ar.writeSimple()
461
462 numRowGroups := ar.options.numRowGroups
463 chunkSize := ar.options.numRows / ar.options.numRowGroups
464
465 chunks := make([]arrow.Array, numRowGroups)
466 for i := 0; i < numRowGroups; i++ {
467 start := int64(chunkSize * i)
468 sl := array.NewSlice(ar.denseVals, start, start+int64(chunkSize))
469 defer sl.Release()
470 chunks[i] = asDict32Encoded(ar.mem, sl)
471 defer chunks[i].Release()
472 }
473
474 chunked := arrow.NewChunked(chunks[0].DataType(), chunks)
475 defer chunked.Release()
476
477 exTable := makeSimpleTable(chunked, true)
478 defer exTable.Release()
479
480 ar.checkReadWholeFile(exTable)
481 }
482
483 func (ar *ArrowReadDictSuite) TestZeroChunksListOfDictionary() {
484 ar.props.SetReadDict(0, true)
485 ar.denseVals.Release()
486 ar.denseVals = nil
487
488 values := arrow.NewChunked(arrow.ListOf(arrow.BinaryTypes.String), []arrow.Array{})
489 defer values.Release()
490
491 ar.options.numRowGroups = 1
492 ar.options.numRows = 0
493 ar.options.numUniques = 0
494 ar.expectedDense.Release()
495 ar.expectedDense = makeSimpleTable(values, false)
496
497 ar.writeSimple()
498
499 rdr := ar.getReader()
500 defer rdr.ParquetReader().Close()
501
502 colReader, err := rdr.GetColumn(context.Background(), 0)
503 ar.Require().NoError(err)
504 defer colReader.Release()
505
506 chnked, err := colReader.NextBatch(1 << 15)
507 ar.Require().NoError(err)
508 defer chnked.Release()
509 ar.Zero(chnked.Len())
510 ar.Len(chnked.Chunks(), 1)
511 }
512
513 func (ar *ArrowReadDictSuite) TestIncrementalReads() {
514 ar.options.numRows = 100
515 ar.options.numUniques = 10
516
517 ar.denseVals.Release()
518 ar.expectedDense.Release()
519 ar.generateData(ar.nullProb)
520
521 ar.props.SetReadDict(0, true)
522
523 ar.Require().NoError(pqarrow.WriteTable(ar.expectedDense, &ar.buf, int64(ar.options.numRows),
524 parquet.NewWriterProperties(parquet.WithDictionaryDefault(true), parquet.WithStats(true)),
525 pqarrow.DefaultWriterProps()))
526
527
528 expected, err := pqarrow.ReadTable(context.Background(), bytes.NewReader(ar.buf.Bytes()), nil, ar.props, ar.mem)
529 ar.Require().NoError(err)
530 defer expected.Release()
531
532 rdr := ar.getReader()
533 defer rdr.ParquetReader().Close()
534 col, err := rdr.GetColumn(context.Background(), 0)
535 ar.Require().NoError(err)
536 defer col.Release()
537
538 const numReads = 4
539 batchSize := ar.options.numRows / numReads
540
541 ctx := compute.WithAllocator(context.Background(), ar.mem)
542
543 for i := 0; i < numReads; i++ {
544 chunk, err := col.NextBatch(int64(batchSize))
545 ar.Require().NoError(err)
546 defer chunk.Release()
547
548
549
550 resultDense, err := compute.CastArray(ctx, chunk.Chunk(0),
551 compute.SafeCastOptions(arrow.BinaryTypes.String))
552 ar.Require().NoError(err)
553 defer resultDense.Release()
554
555 sl := array.NewSlice(ar.denseVals, int64(i*batchSize), int64((i*batchSize)+batchSize))
556 defer sl.Release()
557
558 ar.Truef(array.Equal(sl, resultDense), "expected: %s\ngot: %s", sl, resultDense)
559 }
560 }
561
562 func (ar *ArrowReadDictSuite) TestStreamReadWholeFileDict() {
563 ar.options.numRows = 100
564 ar.options.numUniques = 10
565
566 ar.denseVals.Release()
567 ar.expectedDense.Release()
568 ar.generateData(ar.nullProb)
569
570 ar.writeSimple()
571 ar.props.BatchSize = int64(ar.options.numRows * 2)
572 ar.checkStreamReadWholeFile(ar.expectedDense)
573 }
574
575 func (ar *ArrowReadDictSuite) TestReadWholeFileDense() {
576 ar.props.SetReadDict(0, false)
577 ar.writeSimple()
578 ar.checkReadWholeFile(ar.expectedDense)
579 }
580
581 func doRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64, wrProps *parquet.WriterProperties, arrWrProps *pqarrow.ArrowWriterProperties, arrReadProps pqarrow.ArrowReadProperties) arrow.Table {
582 var buf bytes.Buffer
583 require.NoError(t, pqarrow.WriteTable(tbl, &buf, rowGroupSize, wrProps, *arrWrProps))
584
585 out, err := pqarrow.ReadTable(context.Background(), bytes.NewReader(buf.Bytes()), nil, arrReadProps, wrProps.Allocator())
586 require.NoError(t, err)
587 return out
588 }
589
590 func TestArrowWriteChangingDictionaries(t *testing.T) {
591 const (
592 numUnique = 50
593 repeat = 5000
594 minLen, maxLen int32 = 2, 20
595 )
596
597 rag := testutils.NewRandomArrayGenerator(0)
598 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
599 defer mem.AssertSize(t, 0)
600
601 values := rag.StringWithRepeats(mem, repeat*numUnique, numUnique, minLen, maxLen, 0.1)
602 defer values.Release()
603
604 valuesChunk := arrow.NewChunked(values.DataType(), []arrow.Array{values})
605 defer valuesChunk.Release()
606
607 expected := makeSimpleTable(valuesChunk, true)
608 defer expected.Release()
609
610 const numChunks = 10
611 chunks := make([]arrow.Array, numChunks)
612 chunkSize := valuesChunk.Len() / numChunks
613 for i := 0; i < numChunks; i++ {
614 start := int64(chunkSize * i)
615 sl := array.NewSlice(values, start, start+int64(chunkSize))
616 defer sl.Release()
617 chunks[i] = asDict32Encoded(mem, sl)
618 defer chunks[i].Release()
619 }
620
621 dictChunked := arrow.NewChunked(chunks[0].DataType(), chunks)
622 defer dictChunked.Release()
623 dictTable := makeSimpleTable(dictChunked, true)
624 defer dictTable.Release()
625
626 props := pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))
627 actual := doRoundTrip(t, dictTable, int64(values.Len())/2, parquet.NewWriterProperties(parquet.WithAllocator(mem)),
628 &props, pqarrow.ArrowReadProperties{})
629 defer actual.Release()
630
631 assert.Truef(t, array.TableEqual(expected, actual), "expected: %s\ngot: %s", expected, actual)
632 }
633
634 func TestArrowAutoReadAsDictionary(t *testing.T) {
635 const (
636 numUnique = 50
637 repeat = 100
638 minLen, maxLen int32 = 2, 20
639 )
640
641 rag := testutils.NewRandomArrayGenerator(0)
642 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
643 defer mem.AssertSize(t, 0)
644
645 values := rag.StringWithRepeats(mem, repeat*numUnique, numUnique, minLen, maxLen, 0.1)
646 defer values.Release()
647
648 dictValues := asDict32Encoded(mem, values)
649 defer dictValues.Release()
650
651 dictChunk := arrow.NewChunked(dictValues.DataType(), []arrow.Array{dictValues})
652 defer dictChunk.Release()
653
654 valuesChunk := arrow.NewChunked(values.DataType(), []arrow.Array{values})
655 defer valuesChunk.Release()
656
657 expected := makeSimpleTable(dictChunk, true)
658 defer expected.Release()
659 expectedDense := makeSimpleTable(valuesChunk, true)
660 defer expectedDense.Release()
661
662 wrProps := parquet.NewWriterProperties(parquet.WithAllocator(mem), parquet.WithDictionaryDefault(true))
663 propsStoreSchema := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema())
664 actual := doRoundTrip(t, expected, int64(valuesChunk.Len()), wrProps, &propsStoreSchema, pqarrow.ArrowReadProperties{})
665 defer actual.Release()
666
667 assert.Truef(t, array.TableEqual(expected, actual), "expected: %s\ngot: %s", expected, actual)
668
669 propsNoStoreSchema := pqarrow.NewArrowWriterProperties()
670 actualDense := doRoundTrip(t, expected, int64(valuesChunk.Len()), wrProps, &propsNoStoreSchema, pqarrow.ArrowReadProperties{})
671 defer actualDense.Release()
672
673 assert.Truef(t, array.TableEqual(expectedDense, actualDense), "expected: %s\ngot: %s", expectedDense, actualDense)
674 }
675
676 func TestArrowWriteNestedSubfieldDictionary(t *testing.T) {
677 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
678 defer mem.AssertSize(t, 0)
679
680 offsets, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32, strings.NewReader(`[0, 0, 2, 3]`))
681 defer offsets.Release()
682 indices, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32, strings.NewReader(`[0, 0, 0]`))
683 defer indices.Release()
684 dict, _, _ := array.FromJSON(mem, arrow.BinaryTypes.String, strings.NewReader(`["foo"]`))
685 defer dict.Release()
686
687 dictType := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.String}
688 dictValues := array.NewDictionaryArray(dictType, indices, dict)
689 defer dictValues.Release()
690
691 data := array.NewData(arrow.ListOf(dictType), 3, []*memory.Buffer{nil, offsets.Data().Buffers()[1]},
692 []arrow.ArrayData{dictValues.Data()}, 0, 0)
693 defer data.Release()
694 values := array.NewListData(data)
695 defer values.Release()
696
697 chk := arrow.NewChunked(values.DataType(), []arrow.Array{values})
698 defer chk.Release()
699
700 tbl := makeSimpleTable(chk, true)
701 defer tbl.Release()
702 propsStoreSchema := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema())
703 actual := doRoundTrip(t, tbl, int64(values.Len()), parquet.NewWriterProperties(), &propsStoreSchema, pqarrow.ArrowReadProperties{})
704 defer actual.Release()
705
706 assert.Truef(t, array.TableEqual(tbl, actual), "expected: %s\ngot: %s", tbl, actual)
707 }
708
709 func TestDictOfEmptyStringsRoundtrip(t *testing.T) {
710 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
711 defer mem.AssertSize(t, 0)
712
713 schema := arrow.NewSchema([]arrow.Field{
714 {Name: "reserved1", Type: arrow.BinaryTypes.String, Nullable: true},
715 }, nil)
716
717 bldr := array.NewStringBuilder(mem)
718 defer bldr.Release()
719
720 for i := 0; i < 6; i++ {
721 bldr.AppendEmptyValue()
722 }
723
724 arr := bldr.NewArray()
725 defer arr.Release()
726 col1 := arrow.NewColumnFromArr(schema.Field(0), arr)
727 defer col1.Release()
728 tbl := array.NewTable(schema, []arrow.Column{col1}, 6)
729 defer tbl.Release()
730
731 var buf bytes.Buffer
732 require.NoError(t, pqarrow.WriteTable(tbl, &buf, 6,
733 parquet.NewWriterProperties(parquet.WithDictionaryDefault(true)),
734 pqarrow.NewArrowWriterProperties()))
735
736 result, err := pqarrow.ReadTable(context.Background(), bytes.NewReader(buf.Bytes()), nil, pqarrow.ArrowReadProperties{}, mem)
737 require.NoError(t, err)
738 defer result.Release()
739
740 assert.EqualValues(t, 6, result.NumRows())
741 assert.EqualValues(t, 1, result.NumCols())
742 col := result.Column(0).Data().Chunk(0)
743 assert.Equal(t, arrow.STRING, col.DataType().ID())
744
745 for i := 0; i < 6; i++ {
746 assert.Zero(t, col.(*array.String).Value(i))
747 }
748 }
749
View as plain text