1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pqarrow_test
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "math"
24 "strconv"
25 "strings"
26 "testing"
27
28 "github.com/apache/arrow/go/v15/arrow"
29 "github.com/apache/arrow/go/v15/arrow/array"
30 "github.com/apache/arrow/go/v15/arrow/bitutil"
31 "github.com/apache/arrow/go/v15/arrow/decimal128"
32 "github.com/apache/arrow/go/v15/arrow/decimal256"
33 "github.com/apache/arrow/go/v15/arrow/ipc"
34 "github.com/apache/arrow/go/v15/arrow/memory"
35 "github.com/apache/arrow/go/v15/internal/types"
36 "github.com/apache/arrow/go/v15/internal/utils"
37 "github.com/apache/arrow/go/v15/parquet"
38 "github.com/apache/arrow/go/v15/parquet/compress"
39 "github.com/apache/arrow/go/v15/parquet/file"
40 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
41 "github.com/apache/arrow/go/v15/parquet/internal/testutils"
42 "github.com/apache/arrow/go/v15/parquet/pqarrow"
43 "github.com/apache/arrow/go/v15/parquet/schema"
44 "github.com/google/uuid"
45 "github.com/stretchr/testify/assert"
46 "github.com/stretchr/testify/require"
47 "github.com/stretchr/testify/suite"
48 )
49
50 func makeSimpleTable(values *arrow.Chunked, nullable bool) arrow.Table {
51 sc := arrow.NewSchema([]arrow.Field{{Name: "col", Type: values.DataType(), Nullable: nullable,
52 Metadata: arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"-1"})}}, nil)
53 column := arrow.NewColumn(sc.Field(0), values)
54 defer column.Release()
55 return array.NewTable(sc, []arrow.Column{*column}, -1)
56 }
57
58 func makeDateTimeTypesTable(mem memory.Allocator, expected bool, addFieldMeta bool) arrow.Table {
59 isValid := []bool{true, true, true, false, true, true}
60
61
62 f0 := arrow.Field{Name: "f0", Type: arrow.FixedWidthTypes.Date32, Nullable: true}
63 f1 := arrow.Field{Name: "f1", Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true}
64 f2 := arrow.Field{Name: "f2", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true}
65 f3 := arrow.Field{Name: "f3", Type: arrow.FixedWidthTypes.Timestamp_ns, Nullable: true}
66 f3X := arrow.Field{Name: "f3", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true}
67 f4 := arrow.Field{Name: "f4", Type: arrow.FixedWidthTypes.Time32ms, Nullable: true}
68 f5 := arrow.Field{Name: "f5", Type: arrow.FixedWidthTypes.Time64us, Nullable: true}
69 f6 := arrow.Field{Name: "f6", Type: arrow.FixedWidthTypes.Time64ns, Nullable: true}
70
71 fieldList := []arrow.Field{f0, f1, f2}
72 if expected {
73 fieldList = append(fieldList, f3X)
74 } else {
75 fieldList = append(fieldList, f3)
76 }
77 fieldList = append(fieldList, f4, f5, f6)
78
79 if addFieldMeta {
80 for idx := range fieldList {
81 fieldList[idx].Metadata = arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{strconv.Itoa(idx + 1)})
82 }
83 }
84 arrsc := arrow.NewSchema(fieldList, nil)
85
86 d32Values := []arrow.Date32{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}
87 ts64nsValues := []arrow.Timestamp{1489269000000, 1489270000000, 1489271000000, 1489272000000, 1489272000000, 1489273000000}
88 ts64usValues := []arrow.Timestamp{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}
89 ts64msValues := []arrow.Timestamp{1489269, 1489270, 1489271, 1489272, 1489272, 1489273}
90 t32Values := []arrow.Time32{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}
91 t64nsValues := []arrow.Time64{1489269000000, 1489270000000, 1489271000000, 1489272000000, 1489272000000, 1489273000000}
92 t64usValues := []arrow.Time64{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}
93
94 builders := make([]array.Builder, 0, len(fieldList))
95 for _, f := range fieldList {
96 bldr := array.NewBuilder(mem, f.Type)
97 defer bldr.Release()
98 builders = append(builders, bldr)
99 }
100
101 builders[0].(*array.Date32Builder).AppendValues(d32Values, isValid)
102 builders[1].(*array.TimestampBuilder).AppendValues(ts64msValues, isValid)
103 builders[2].(*array.TimestampBuilder).AppendValues(ts64usValues, isValid)
104 if expected {
105 builders[3].(*array.TimestampBuilder).AppendValues(ts64usValues, isValid)
106 } else {
107 builders[3].(*array.TimestampBuilder).AppendValues(ts64nsValues, isValid)
108 }
109 builders[4].(*array.Time32Builder).AppendValues(t32Values, isValid)
110 builders[5].(*array.Time64Builder).AppendValues(t64usValues, isValid)
111 builders[6].(*array.Time64Builder).AppendValues(t64nsValues, isValid)
112
113 cols := make([]arrow.Column, 0, len(fieldList))
114 for idx, field := range fieldList {
115 arr := builders[idx].NewArray()
116 defer arr.Release()
117
118 chunked := arrow.NewChunked(field.Type, []arrow.Array{arr})
119 defer chunked.Release()
120 col := arrow.NewColumn(field, chunked)
121 defer col.Release()
122 cols = append(cols, *col)
123 }
124
125 return array.NewTable(arrsc, cols, int64(len(isValid)))
126 }
127
128 func TestWriteArrowCols(t *testing.T) {
129 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
130 defer mem.AssertSize(t, 0)
131
132 tbl := makeDateTimeTypesTable(mem, false, false)
133 defer tbl.Release()
134
135 sink := encoding.NewBufferWriter(0, mem)
136 defer sink.Release()
137
138 fileWriter, err := pqarrow.NewFileWriter(
139 tbl.Schema(),
140 sink,
141 parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)),
142 pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)),
143 )
144 require.NoError(t, err)
145
146 fileWriter.NewRowGroup()
147 for i := int64(0); i < tbl.NumCols(); i++ {
148 colChunk := tbl.Column(int(i)).Data()
149 err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
150 require.NoError(t, err)
151 }
152 require.NoError(t, fileWriter.Close())
153
154 expected := makeDateTimeTypesTable(mem, true, false)
155 defer expected.Release()
156
157 reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
158 require.NoError(t, err)
159
160 assert.EqualValues(t, expected.NumCols(), reader.MetaData().Schema.NumColumns())
161 assert.EqualValues(t, expected.NumRows(), reader.NumRows())
162 assert.EqualValues(t, 1, reader.NumRowGroups())
163
164 rgr := reader.RowGroup(0)
165
166 for i := 0; i < int(expected.NumCols()); i++ {
167 var (
168 total int64
169 read int
170 defLevelsOut = make([]int16, int(expected.NumRows()))
171 arr = expected.Column(i).Data().Chunk(0)
172 )
173 switch expected.Schema().Field(i).Type.(arrow.FixedWidthDataType).BitWidth() {
174 case 32:
175 col, err := rgr.Column(i)
176 assert.NoError(t, err)
177 colReader := col.(*file.Int32ColumnChunkReader)
178 vals := make([]int32, int(expected.NumRows()))
179 total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil)
180 require.NoError(t, err)
181
182 nulls := 0
183 for j := 0; j < arr.Len(); j++ {
184 if arr.IsNull(j) {
185 nulls++
186 continue
187 }
188
189 switch v := arr.(type) {
190 case *array.Date32:
191 assert.EqualValues(t, v.Value(j), vals[j-nulls])
192 case *array.Time32:
193 assert.EqualValues(t, v.Value(j), vals[j-nulls])
194 }
195 }
196 case 64:
197 col, err := rgr.Column(i)
198 assert.NoError(t, err)
199 colReader := col.(*file.Int64ColumnChunkReader)
200 vals := make([]int64, int(expected.NumRows()))
201 total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil)
202 require.NoError(t, err)
203
204 nulls := 0
205 for j := 0; j < arr.Len(); j++ {
206 if arr.IsNull(j) {
207 nulls++
208 continue
209 }
210
211 switch v := arr.(type) {
212 case *array.Date64:
213 assert.EqualValues(t, v.Value(j), vals[j-nulls])
214 case *array.Time64:
215 assert.EqualValues(t, v.Value(j), vals[j-nulls])
216 case *array.Timestamp:
217 assert.EqualValues(t, v.Value(j), vals[j-nulls])
218 }
219 }
220 }
221 assert.EqualValues(t, expected.NumRows(), total)
222 assert.EqualValues(t, expected.NumRows()-1, read)
223 assert.Equal(t, []int16{1, 1, 1, 0, 1, 1}, defLevelsOut)
224 }
225 }
226
227 func TestWriteArrowInt96(t *testing.T) {
228 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
229 defer mem.AssertSize(t, 0)
230
231 tbl := makeDateTimeTypesTable(mem, false, false)
232 defer tbl.Release()
233
234 sink := encoding.NewBufferWriter(0, mem)
235 defer sink.Release()
236
237 fileWriter, err := pqarrow.NewFileWriter(
238 tbl.Schema(),
239 sink,
240 parquet.NewWriterProperties(parquet.WithAllocator(mem)),
241 pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem)),
242 )
243 require.NoError(t, err)
244
245 fileWriter.NewRowGroup()
246 for i := int64(0); i < tbl.NumCols(); i++ {
247 colChunk := tbl.Column(int(i)).Data()
248 err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
249 require.NoError(t, err)
250 }
251 require.NoError(t, fileWriter.Close())
252
253 expected := makeDateTimeTypesTable(mem, false, false)
254 defer expected.Release()
255
256 reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
257 require.NoError(t, err)
258
259 assert.EqualValues(t, expected.NumCols(), reader.MetaData().Schema.NumColumns())
260 assert.EqualValues(t, expected.NumRows(), reader.NumRows())
261 assert.EqualValues(t, 1, reader.NumRowGroups())
262
263 rgr := reader.RowGroup(0)
264 tsRdr, err := rgr.Column(3)
265 assert.NoError(t, err)
266 assert.Equal(t, parquet.Types.Int96, tsRdr.Type())
267
268 rdr := tsRdr.(*file.Int96ColumnChunkReader)
269 vals := make([]parquet.Int96, expected.NumRows())
270 defLevels := make([]int16, int(expected.NumRows()))
271
272 total, read, _ := rdr.ReadBatch(expected.NumRows(), vals, defLevels, nil)
273 assert.EqualValues(t, expected.NumRows(), total)
274 assert.EqualValues(t, expected.NumRows()-1, read)
275 assert.Equal(t, []int16{1, 1, 1, 0, 1, 1}, defLevels)
276
277 data := expected.Column(3).Data().Chunk(0).(*array.Timestamp)
278 assert.EqualValues(t, data.Value(0), vals[0].ToTime().UnixNano())
279 assert.EqualValues(t, data.Value(1), vals[1].ToTime().UnixNano())
280 assert.EqualValues(t, data.Value(2), vals[2].ToTime().UnixNano())
281 assert.EqualValues(t, data.Value(4), vals[3].ToTime().UnixNano())
282 assert.EqualValues(t, data.Value(5), vals[4].ToTime().UnixNano())
283 }
284
285 func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer {
286 sink := encoding.NewBufferWriter(0, mem)
287 defer sink.Release()
288
289 fileWriter, err := pqarrow.NewFileWriter(
290 tbl.Schema(),
291 sink,
292 parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)),
293 props,
294 )
295 require.NoError(t, err)
296
297 offset := int64(0)
298 for offset < tbl.NumRows() {
299 sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
300 fileWriter.NewRowGroup()
301 for i := 0; i < int(tbl.NumCols()); i++ {
302 colChunk := tbl.Column(i).Data()
303 err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
304 require.NoError(t, err)
305 }
306 offset += sz
307 }
308
309 require.NoError(t, fileWriter.Close())
310 return sink.Finish()
311 }
312
313 func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) {
314 t.Helper()
315 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
316 defer mem.AssertSize(t, 0)
317
318 buf := writeTableToBuffer(t, mem, tbl, rowGroupSize, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
319 defer buf.Release()
320
321 rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
322 require.NoError(t, err)
323
324 ardr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
325 require.NoError(t, err)
326
327 for i := 0; i < int(tbl.NumCols()); i++ {
328 crdr, err := ardr.GetColumn(context.TODO(), i)
329 require.NoError(t, err)
330
331 chunked, err := crdr.NextBatch(tbl.NumRows())
332 require.NoError(t, err)
333 defer chunked.Release()
334
335 require.EqualValues(t, tbl.NumRows(), chunked.Len())
336
337 chunkList := tbl.Column(i).Data().Chunks()
338 offset := int64(0)
339 for _, chnk := range chunkList {
340 slc := array.NewChunkedSlice(chunked, offset, offset+int64(chnk.Len()))
341 defer slc.Release()
342
343 assert.EqualValues(t, chnk.Len(), slc.Len())
344 if len(slc.Chunks()) == 1 {
345 offset += int64(chnk.Len())
346 assert.True(t, array.Equal(chnk, slc.Chunk(0)))
347 }
348 }
349 crdr.Release()
350 }
351 }
352
353 func TestWriteKeyValueMetadata(t *testing.T) {
354 kv := map[string]string{
355 "key1": "value1",
356 "key2": "value2",
357 "key3": "value3",
358 }
359
360 sc := arrow.NewSchema([]arrow.Field{
361 {Name: "int32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
362 }, nil)
363 bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
364 defer bldr.Release()
365 for _, b := range bldr.Fields() {
366 b.AppendNull()
367 }
368
369 rec := bldr.NewRecord()
370 defer rec.Release()
371
372 props := parquet.NewWriterProperties(
373 parquet.WithVersion(parquet.V1_0),
374 )
375 var buf bytes.Buffer
376 fw, err := pqarrow.NewFileWriter(sc, &buf, props, pqarrow.DefaultWriterProps())
377 require.NoError(t, err)
378 err = fw.Write(rec)
379 require.NoError(t, err)
380
381 for key, value := range kv {
382 require.NoError(t, fw.AppendKeyValueMetadata(key, value))
383 }
384
385 err = fw.Close()
386 require.NoError(t, err)
387
388 reader, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
389 require.NoError(t, err)
390
391 for key, value := range kv {
392 got := reader.MetaData().KeyValueMetadata().FindValue(key)
393 require.NotNil(t, got)
394 assert.Equal(t, value, *got)
395 }
396 }
397
398 func TestWriteEmptyLists(t *testing.T) {
399 sc := arrow.NewSchema([]arrow.Field{
400 {Name: "f1", Type: arrow.ListOf(arrow.FixedWidthTypes.Date32)},
401 {Name: "f2", Type: arrow.ListOf(arrow.FixedWidthTypes.Date64)},
402 {Name: "f3", Type: arrow.ListOf(arrow.FixedWidthTypes.Timestamp_us)},
403 {Name: "f4", Type: arrow.ListOf(arrow.FixedWidthTypes.Timestamp_ms)},
404 {Name: "f5", Type: arrow.ListOf(arrow.FixedWidthTypes.Time32ms)},
405 {Name: "f6", Type: arrow.ListOf(arrow.FixedWidthTypes.Time64ns)},
406 {Name: "f7", Type: arrow.ListOf(arrow.FixedWidthTypes.Time64us)},
407 }, nil)
408 bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
409 defer bldr.Release()
410 for _, b := range bldr.Fields() {
411 b.AppendNull()
412 }
413
414 rec := bldr.NewRecord()
415 defer rec.Release()
416
417 props := parquet.NewWriterProperties(
418 parquet.WithVersion(parquet.V1_0),
419 )
420 arrprops := pqarrow.DefaultWriterProps()
421 var buf bytes.Buffer
422 fw, err := pqarrow.NewFileWriter(sc, &buf, props, arrprops)
423 require.NoError(t, err)
424 err = fw.Write(rec)
425 require.NoError(t, err)
426 err = fw.Close()
427 require.NoError(t, err)
428 }
429
430 func TestArrowReadWriteTableChunkedCols(t *testing.T) {
431 chunkSizes := []int{2, 4, 10, 2}
432 const totalLen = int64(18)
433
434 rng := testutils.NewRandomArrayGenerator(0)
435
436 arr := rng.Int32(totalLen, 0, math.MaxInt32/2, 0.9)
437 defer arr.Release()
438
439 offset := int64(0)
440 chunks := make([]arrow.Array, 0)
441 for _, chnksize := range chunkSizes {
442 chk := array.NewSlice(arr, offset, offset+int64(chnksize))
443 defer chk.Release()
444 defer chk.Release()
445 chunks = append(chunks, chk)
446 }
447
448 sc := arrow.NewSchema([]arrow.Field{{Name: "field", Type: arr.DataType(), Nullable: true}}, nil)
449
450 chk := arrow.NewChunked(arr.DataType(), chunks)
451 defer chk.Release()
452
453 tbl := array.NewTable(sc, []arrow.Column{*arrow.NewColumn(sc.Field(0), chk)}, -1)
454 defer tbl.Release()
455
456 simpleRoundTrip(t, tbl, 2)
457 simpleRoundTrip(t, tbl, 10)
458 }
459
460
461
462 func getLogicalType(typ arrow.DataType) schema.LogicalType {
463 switch typ.ID() {
464 case arrow.DICTIONARY:
465 return getLogicalType(typ.(*arrow.DictionaryType).ValueType)
466 case arrow.INT8:
467 return schema.NewIntLogicalType(8, true)
468 case arrow.UINT8:
469 return schema.NewIntLogicalType(8, false)
470 case arrow.INT16:
471 return schema.NewIntLogicalType(16, true)
472 case arrow.UINT16:
473 return schema.NewIntLogicalType(16, false)
474 case arrow.INT32:
475 return schema.NewIntLogicalType(32, true)
476 case arrow.UINT32:
477 return schema.NewIntLogicalType(32, false)
478 case arrow.INT64:
479 return schema.NewIntLogicalType(64, true)
480 case arrow.UINT64:
481 return schema.NewIntLogicalType(64, false)
482 case arrow.STRING, arrow.LARGE_STRING:
483 return schema.StringLogicalType{}
484 case arrow.DATE32:
485 return schema.DateLogicalType{}
486 case arrow.DATE64:
487 return schema.DateLogicalType{}
488 case arrow.FLOAT16:
489 return schema.Float16LogicalType{}
490 case arrow.TIMESTAMP:
491 ts := typ.(*arrow.TimestampType)
492 adjustedUTC := len(ts.TimeZone) == 0
493 switch ts.Unit {
494 case arrow.Microsecond:
495 return schema.NewTimestampLogicalType(adjustedUTC, schema.TimeUnitMicros)
496 case arrow.Millisecond:
497 return schema.NewTimestampLogicalType(adjustedUTC, schema.TimeUnitMillis)
498 case arrow.Nanosecond:
499 return schema.NewTimestampLogicalType(adjustedUTC, schema.TimeUnitNanos)
500 default:
501 panic("only milli, micro and nano units supported for arrow timestamp")
502 }
503 case arrow.TIME32:
504 return schema.NewTimeLogicalType(false, schema.TimeUnitMillis)
505 case arrow.TIME64:
506 ts := typ.(*arrow.Time64Type)
507 switch ts.Unit {
508 case arrow.Microsecond:
509 return schema.NewTimeLogicalType(false, schema.TimeUnitMicros)
510 case arrow.Nanosecond:
511 return schema.NewTimeLogicalType(false, schema.TimeUnitNanos)
512 default:
513 panic("only micro and nano seconds are supported for arrow TIME64")
514 }
515 case arrow.DECIMAL, arrow.DECIMAL256:
516 dec := typ.(arrow.DecimalType)
517 return schema.NewDecimalLogicalType(dec.GetPrecision(), dec.GetScale())
518 }
519 return schema.NoLogicalType{}
520 }
521
522 func getPhysicalType(typ arrow.DataType) parquet.Type {
523 switch typ.ID() {
524 case arrow.DICTIONARY:
525 return getPhysicalType(typ.(*arrow.DictionaryType).ValueType)
526 case arrow.BOOL:
527 return parquet.Types.Boolean
528 case arrow.UINT8, arrow.INT8, arrow.UINT16, arrow.INT16, arrow.UINT32, arrow.INT32:
529 return parquet.Types.Int32
530 case arrow.INT64, arrow.UINT64:
531 return parquet.Types.Int64
532 case arrow.FLOAT32:
533 return parquet.Types.Float
534 case arrow.FLOAT64:
535 return parquet.Types.Double
536 case arrow.FLOAT16:
537 return parquet.Types.FixedLenByteArray
538 case arrow.BINARY, arrow.LARGE_BINARY, arrow.STRING, arrow.LARGE_STRING:
539 return parquet.Types.ByteArray
540 case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL:
541 return parquet.Types.FixedLenByteArray
542 case arrow.DATE32:
543 return parquet.Types.Int32
544 case arrow.DATE64:
545
546 return parquet.Types.Int32
547 case arrow.TIME32:
548 return parquet.Types.Int32
549 case arrow.TIME64, arrow.TIMESTAMP:
550 return parquet.Types.Int64
551 default:
552 return parquet.Types.Int32
553 }
554 }
555
556 const (
557 boolTestValue = true
558 uint8TestVal = uint8(64)
559 int8TestVal = int8(-64)
560 uint16TestVal = uint16(1024)
561 int16TestVal = int16(-1024)
562 uint32TestVal = uint32(1024)
563 int32TestVal = int32(-1024)
564 uint64TestVal = uint64(1024)
565 int64TestVal = int64(-1024)
566 tsTestValue = arrow.Timestamp(14695634030000)
567 date32TestVal = arrow.Date32(170000)
568 floatTestVal = float32(2.1)
569 doubleTestVal = float64(4.2)
570 strTestVal = "Test"
571
572 smallSize = 100
573 )
574
575 type ParquetIOTestSuite struct {
576 suite.Suite
577 }
578
579 func (ps *ParquetIOTestSuite) SetupTest() {
580 ps.NoError(arrow.RegisterExtensionType(types.NewUUIDType()))
581 }
582
583 func (ps *ParquetIOTestSuite) TearDownTest() {
584 if arrow.GetExtensionType("uuid") != nil {
585 ps.NoError(arrow.UnregisterExtensionType("uuid"))
586 }
587 }
588
589 func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.Repetition) *schema.GroupNode {
590 byteWidth := int32(-1)
591
592 switch typ := typ.(type) {
593 case *arrow.FixedSizeBinaryType:
594 byteWidth = int32(typ.ByteWidth)
595 case arrow.DecimalType:
596 byteWidth = pqarrow.DecimalSize(typ.GetPrecision())
597 case *arrow.Float16Type:
598 byteWidth = int32(typ.Bytes())
599 case *arrow.DictionaryType:
600 valuesType := typ.ValueType
601 switch dt := valuesType.(type) {
602 case *arrow.FixedSizeBinaryType:
603 byteWidth = int32(dt.ByteWidth)
604 case arrow.DecimalType:
605 byteWidth = pqarrow.DecimalSize(dt.GetPrecision())
606 case *arrow.Float16Type:
607 byteWidth = int32(typ.Bytes())
608 }
609 }
610
611 pnode, _ := schema.NewPrimitiveNodeLogical("column1", rep, getLogicalType(typ), getPhysicalType(typ), int(byteWidth), -1)
612 return schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{pnode}, -1))
613 }
614
615 func (ps *ParquetIOTestSuite) makePrimitiveTestCol(mem memory.Allocator, size int, typ arrow.DataType) arrow.Array {
616 switch typ.ID() {
617 case arrow.BOOL:
618 bldr := array.NewBooleanBuilder(mem)
619 defer bldr.Release()
620 for i := 0; i < size; i++ {
621 bldr.Append(boolTestValue)
622 }
623 return bldr.NewArray()
624 case arrow.INT8:
625 bldr := array.NewInt8Builder(mem)
626 defer bldr.Release()
627 for i := 0; i < size; i++ {
628 bldr.Append(int8TestVal)
629 }
630 return bldr.NewArray()
631 case arrow.UINT8:
632 bldr := array.NewUint8Builder(mem)
633 defer bldr.Release()
634 for i := 0; i < size; i++ {
635 bldr.Append(uint8TestVal)
636 }
637 return bldr.NewArray()
638 case arrow.INT16:
639 bldr := array.NewInt16Builder(mem)
640 defer bldr.Release()
641 for i := 0; i < size; i++ {
642 bldr.Append(int16TestVal)
643 }
644 return bldr.NewArray()
645 case arrow.UINT16:
646 bldr := array.NewUint16Builder(mem)
647 defer bldr.Release()
648 for i := 0; i < size; i++ {
649 bldr.Append(uint16TestVal)
650 }
651 return bldr.NewArray()
652 case arrow.INT32:
653 bldr := array.NewInt32Builder(mem)
654 defer bldr.Release()
655 for i := 0; i < size; i++ {
656 bldr.Append(int32TestVal)
657 }
658 return bldr.NewArray()
659 case arrow.UINT32:
660 bldr := array.NewUint32Builder(mem)
661 defer bldr.Release()
662 for i := 0; i < size; i++ {
663 bldr.Append(uint32TestVal)
664 }
665 return bldr.NewArray()
666 case arrow.INT64:
667 bldr := array.NewInt64Builder(mem)
668 defer bldr.Release()
669 for i := 0; i < size; i++ {
670 bldr.Append(int64TestVal)
671 }
672 return bldr.NewArray()
673 case arrow.UINT64:
674 bldr := array.NewUint64Builder(mem)
675 defer bldr.Release()
676 for i := 0; i < size; i++ {
677 bldr.Append(uint64TestVal)
678 }
679 return bldr.NewArray()
680 case arrow.FLOAT32:
681 bldr := array.NewFloat32Builder(mem)
682 defer bldr.Release()
683 for i := 0; i < size; i++ {
684 bldr.Append(floatTestVal)
685 }
686 return bldr.NewArray()
687 case arrow.FLOAT64:
688 bldr := array.NewFloat64Builder(mem)
689 defer bldr.Release()
690 for i := 0; i < size; i++ {
691 bldr.Append(doubleTestVal)
692 }
693 return bldr.NewArray()
694 }
695 return nil
696 }
697
698 func (ps *ParquetIOTestSuite) makeTestFile(mem memory.Allocator, typ arrow.DataType, arr arrow.Array, numChunks int) []byte {
699 sc := ps.makeSimpleSchema(typ, parquet.Repetitions.Required)
700 sink := encoding.NewBufferWriter(0, mem)
701 defer sink.Release()
702 writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))
703
704 props := pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))
705 ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
706 rowGroupSize := arr.Len() / numChunks
707
708 for i := 0; i < numChunks; i++ {
709 rgw := writer.AppendRowGroup()
710 cw, err := rgw.NextColumn()
711 ps.NoError(err)
712
713 start := i * rowGroupSize
714 slc := array.NewSlice(arr, int64(start), int64(start+rowGroupSize))
715 defer slc.Release()
716 ps.NoError(pqarrow.WriteArrowToColumn(ctx, cw, slc, nil, nil, false))
717 ps.NoError(cw.Close())
718 ps.NoError(rgw.Close())
719 }
720 ps.NoError(writer.Close())
721 buf := sink.Finish()
722 defer buf.Release()
723 return buf.Bytes()
724 }
725
726 func (ps *ParquetIOTestSuite) createReader(mem memory.Allocator, data []byte) *pqarrow.FileReader {
727 rdr, err := file.NewParquetReader(bytes.NewReader(data), file.WithReadProps(parquet.NewReaderProperties(mem)))
728 ps.NoError(err)
729
730 reader, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
731 ps.NoError(err)
732 return reader
733 }
734
735 func (ps *ParquetIOTestSuite) readTable(rdr *pqarrow.FileReader) arrow.Table {
736 tbl, err := rdr.ReadTable(context.TODO())
737 ps.NoError(err)
738 ps.NotNil(tbl)
739 return tbl
740 }
741
742 func (ps *ParquetIOTestSuite) checkSingleColumnRequiredTableRead(mem memory.Allocator, typ arrow.DataType, numChunks int) {
743 values := ps.makePrimitiveTestCol(mem, smallSize, typ)
744 defer values.Release()
745
746 data := ps.makeTestFile(mem, typ, values, numChunks)
747 reader := ps.createReader(mem, data)
748
749 tbl := ps.readTable(reader)
750 defer tbl.Release()
751
752 ps.EqualValues(1, tbl.NumCols())
753 ps.EqualValues(smallSize, tbl.NumRows())
754
755 chunked := tbl.Column(0).Data()
756 ps.Len(chunked.Chunks(), 1)
757 ps.True(array.Equal(values, chunked.Chunk(0)))
758 }
759
760 func (ps *ParquetIOTestSuite) checkSingleColumnRead(mem memory.Allocator, typ arrow.DataType, numChunks int) {
761 values := ps.makePrimitiveTestCol(mem, smallSize, typ)
762 defer values.Release()
763
764 data := ps.makeTestFile(mem, typ, values, numChunks)
765 reader := ps.createReader(mem, data)
766
767 cr, err := reader.GetColumn(context.TODO(), 0)
768 ps.NoError(err)
769 defer cr.Release()
770
771 chunked, err := cr.NextBatch(smallSize)
772 ps.NoError(err)
773 defer chunked.Release()
774
775 ps.Len(chunked.Chunks(), 1)
776 ps.True(array.Equal(values, chunked.Chunk(0)))
777 }
778
779 func (ps *ParquetIOTestSuite) TestDateTimeTypesReadWriteTable() {
780 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
781 defer mem.AssertSize(ps.T(), 0)
782
783 toWrite := makeDateTimeTypesTable(mem, false, true)
784 defer toWrite.Release()
785 buf := writeTableToBuffer(ps.T(), mem, toWrite, toWrite.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
786 defer buf.Release()
787
788 reader := ps.createReader(mem, buf.Bytes())
789 tbl := ps.readTable(reader)
790 defer tbl.Release()
791
792 expected := makeDateTimeTypesTable(mem, true, true)
793 defer expected.Release()
794
795 ps.Equal(expected.NumCols(), tbl.NumCols())
796 ps.Equal(expected.NumRows(), tbl.NumRows())
797 ps.Truef(expected.Schema().Equal(tbl.Schema()), "expected schema: %s\ngot schema: %s", expected.Schema(), tbl.Schema())
798
799 for i := 0; i < int(expected.NumCols()); i++ {
800 exChunk := expected.Column(i).Data()
801 tblChunk := tbl.Column(i).Data()
802
803 ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
804 ps.Truef(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)), "expected %s\ngot %s", exChunk.Chunk(0), tblChunk.Chunk(0))
805 }
806 }
807
808 func (ps *ParquetIOTestSuite) TestDateTimeTypesWithInt96ReadWriteTable() {
809 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
810 defer mem.AssertSize(ps.T(), 0)
811
812 expected := makeDateTimeTypesTable(mem, false, true)
813 defer expected.Release()
814 buf := writeTableToBuffer(ps.T(), mem, expected, expected.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true)))
815 defer buf.Release()
816
817 reader := ps.createReader(mem, buf.Bytes())
818 tbl := ps.readTable(reader)
819 defer tbl.Release()
820
821 ps.Equal(expected.NumCols(), tbl.NumCols())
822 ps.Equal(expected.NumRows(), tbl.NumRows())
823 ps.Truef(expected.Schema().Equal(tbl.Schema()), "expected schema: %s\ngot schema: %s", expected.Schema(), tbl.Schema())
824
825 for i := 0; i < int(expected.NumCols()); i++ {
826 exChunk := expected.Column(i).Data()
827 tblChunk := tbl.Column(i).Data()
828
829 ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
830 ps.Truef(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)), "expected %s\ngot %s", exChunk.Chunk(0), tblChunk.Chunk(0))
831 }
832 }
833
834 func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
835 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
836 defer mem.AssertSize(ps.T(), 0)
837
838
839
840 lsBldr := array.NewLargeStringBuilder(mem)
841 defer lsBldr.Release()
842 lbBldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.LargeBinary)
843 defer lbBldr.Release()
844
845 for i := 0; i < smallSize; i++ {
846 s := strconv.FormatInt(int64(i), 10)
847 lsBldr.Append(s)
848 lbBldr.Append([]byte(s))
849 }
850
851 lsValues := lsBldr.NewArray()
852 defer lsValues.Release()
853 lbValues := lbBldr.NewArray()
854 defer lbValues.Release()
855
856 lsField := arrow.Field{Name: "large_string", Type: arrow.BinaryTypes.LargeString, Nullable: true}
857 lbField := arrow.Field{Name: "large_binary", Type: arrow.BinaryTypes.LargeBinary, Nullable: true}
858 expected := array.NewTable(
859 arrow.NewSchema([]arrow.Field{lsField, lbField}, nil),
860 []arrow.Column{
861 *arrow.NewColumn(lsField, arrow.NewChunked(lsField.Type, []arrow.Array{lsValues})),
862 *arrow.NewColumn(lbField, arrow.NewChunked(lbField.Type, []arrow.Array{lbValues})),
863 },
864 -1,
865 )
866 defer lsValues.Release()
867 defer lbValues.Release()
868 defer expected.Release()
869 ps.roundTripTable(mem, expected, true)
870 }
871
872 func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() {
873 types := []arrow.DataType{
874 arrow.FixedWidthTypes.Boolean,
875 arrow.PrimitiveTypes.Uint8,
876 arrow.PrimitiveTypes.Int8,
877 arrow.PrimitiveTypes.Uint16,
878 arrow.PrimitiveTypes.Int16,
879 arrow.PrimitiveTypes.Uint32,
880 arrow.PrimitiveTypes.Int32,
881 arrow.PrimitiveTypes.Uint64,
882 arrow.PrimitiveTypes.Int64,
883 arrow.PrimitiveTypes.Float32,
884 arrow.PrimitiveTypes.Float64,
885 }
886
887 nchunks := []int{1, 4}
888
889 for _, n := range nchunks {
890 for _, dt := range types {
891 ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n), func() {
892 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
893 defer mem.AssertSize(ps.T(), 0)
894 ps.checkSingleColumnRead(mem, dt, n)
895 })
896 }
897 }
898 }
899
900 func (ps *ParquetIOTestSuite) TestSingleColumnRequiredRead() {
901 types := []arrow.DataType{
902 arrow.FixedWidthTypes.Boolean,
903 arrow.PrimitiveTypes.Uint8,
904 arrow.PrimitiveTypes.Int8,
905 arrow.PrimitiveTypes.Uint16,
906 arrow.PrimitiveTypes.Int16,
907 arrow.PrimitiveTypes.Uint32,
908 arrow.PrimitiveTypes.Int32,
909 arrow.PrimitiveTypes.Uint64,
910 arrow.PrimitiveTypes.Int64,
911 arrow.PrimitiveTypes.Float32,
912 arrow.PrimitiveTypes.Float64,
913 }
914
915 nchunks := []int{1, 4}
916
917 for _, n := range nchunks {
918 for _, dt := range types {
919 ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n), func() {
920 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
921 defer mem.AssertSize(ps.T(), 0)
922
923 ps.checkSingleColumnRequiredTableRead(mem, dt, n)
924 })
925 }
926 }
927 }
928
929 func (ps *ParquetIOTestSuite) TestReadDecimals() {
930 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
931 defer mem.AssertSize(ps.T(), 0)
932
933 bigEndian := []parquet.ByteArray{
934
935 []byte{1, 226, 64},
936
937 []byte{15, 18, 6},
938
939 []byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192},
940 }
941
942 bldr := array.NewDecimal128Builder(mem, &arrow.Decimal128Type{Precision: 6, Scale: 3})
943 defer bldr.Release()
944
945 bldr.Append(decimal128.FromU64(123456))
946 bldr.Append(decimal128.FromU64(987654))
947 bldr.Append(decimal128.FromI64(-123456))
948
949 expected := bldr.NewDecimal128Array()
950 defer expected.Release()
951
952 sc := schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{
953 schema.Must(schema.NewPrimitiveNodeLogical("decimals", parquet.Repetitions.Required, schema.NewDecimalLogicalType(6, 3), parquet.Types.ByteArray, -1, -1)),
954 }, -1))
955
956 sink := encoding.NewBufferWriter(0, mem)
957 defer sink.Release()
958 writer := file.NewParquetWriter(sink, sc)
959
960 rgw := writer.AppendRowGroup()
961 cw, _ := rgw.NextColumn()
962 cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil)
963 cw.Close()
964 rgw.Close()
965 writer.Close()
966
967 rdr := ps.createReader(mem, sink.Bytes())
968 cr, err := rdr.GetColumn(context.TODO(), 0)
969 ps.NoError(err)
970
971 chunked, err := cr.NextBatch(smallSize)
972 ps.NoError(err)
973 defer chunked.Release()
974
975 ps.Len(chunked.Chunks(), 1)
976 ps.True(array.Equal(expected, chunked.Chunk(0)))
977 }
978
979 func (ps *ParquetIOTestSuite) TestReadDecimal256() {
980 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
981 defer mem.AssertSize(ps.T(), 0)
982
983 bigEndian := []parquet.ByteArray{
984
985 []byte{1, 226, 64},
986
987 []byte{15, 18, 6},
988
989 []byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192},
990 }
991
992 bldr := array.NewDecimal256Builder(mem, &arrow.Decimal256Type{Precision: 40, Scale: 3})
993 defer bldr.Release()
994
995 bldr.Append(decimal256.FromU64(123456))
996 bldr.Append(decimal256.FromU64(987654))
997 bldr.Append(decimal256.FromI64(-123456))
998
999 expected := bldr.NewDecimal256Array()
1000 defer expected.Release()
1001
1002 sc := schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{
1003 schema.Must(schema.NewPrimitiveNodeLogical("decimals", parquet.Repetitions.Required, schema.NewDecimalLogicalType(40, 3), parquet.Types.ByteArray, -1, -1)),
1004 }, -1))
1005
1006 sink := encoding.NewBufferWriter(0, mem)
1007 defer sink.Release()
1008 writer := file.NewParquetWriter(sink, sc)
1009
1010 rgw := writer.AppendRowGroup()
1011 cw, _ := rgw.NextColumn()
1012 cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil)
1013 cw.Close()
1014 rgw.Close()
1015 writer.Close()
1016
1017 rdr := ps.createReader(mem, sink.Bytes())
1018 cr, err := rdr.GetColumn(context.TODO(), 0)
1019 ps.NoError(err)
1020
1021 chunked, err := cr.NextBatch(smallSize)
1022 ps.NoError(err)
1023 defer chunked.Release()
1024
1025 ps.Len(chunked.Chunks(), 1)
1026 ps.Truef(array.Equal(expected, chunked.Chunk(0)), "expected: %s\ngot: %s", expected, chunked.Chunk(0))
1027 }
1028
1029 func (ps *ParquetIOTestSuite) TestReadNestedStruct() {
1030 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1031 defer mem.AssertSize(ps.T(), 0)
1032
1033 dt := arrow.StructOf(arrow.Field{
1034 Name: "nested",
1035 Type: arrow.StructOf(
1036 arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean},
1037 arrow.Field{Name: "int32", Type: arrow.PrimitiveTypes.Int32},
1038 arrow.Field{Name: "int64", Type: arrow.PrimitiveTypes.Int64},
1039 ),
1040 })
1041 field := arrow.Field{Name: "struct", Type: dt, Nullable: true}
1042
1043 builder := array.NewStructBuilder(mem, dt)
1044 defer builder.Release()
1045 nested := builder.FieldBuilder(0).(*array.StructBuilder)
1046
1047 builder.Append(true)
1048 nested.Append(true)
1049 nested.FieldBuilder(0).(*array.BooleanBuilder).Append(true)
1050 nested.FieldBuilder(1).(*array.Int32Builder).Append(int32(-1))
1051 nested.FieldBuilder(2).(*array.Int64Builder).Append(int64(-2))
1052 builder.AppendNull()
1053
1054 arr := builder.NewStructArray()
1055 defer arr.Release()
1056
1057 expected := array.NewTable(
1058 arrow.NewSchema([]arrow.Field{field}, nil),
1059 []arrow.Column{*arrow.NewColumn(field, arrow.NewChunked(dt, []arrow.Array{arr}))},
1060 -1,
1061 )
1062 defer arr.Release()
1063 defer expected.Release()
1064 ps.roundTripTable(mem, expected, true)
1065 }
1066
1067 func (ps *ParquetIOTestSuite) writeColumn(mem memory.Allocator, sc *schema.GroupNode, values arrow.Array) []byte {
1068 var buf bytes.Buffer
1069 arrsc, err := pqarrow.FromParquet(schema.NewSchema(sc), nil, nil)
1070 ps.NoError(err)
1071
1072 writer, err := pqarrow.NewFileWriter(arrsc, &buf, parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
1073 ps.NoError(err)
1074
1075 writer.NewRowGroup()
1076 ps.NoError(writer.WriteColumnData(values))
1077
1078 ps.NoError(writer.Close())
1079 ps.NoError(writer.Close())
1080
1081 return buf.Bytes()
1082 }
1083
1084 func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(mem memory.Allocator, data []byte, values arrow.Array) {
1085 reader := ps.createReader(mem, data)
1086 cr, err := reader.GetColumn(context.TODO(), 0)
1087 ps.NoError(err)
1088 ps.NotNil(cr)
1089 defer cr.Release()
1090
1091 chunked, err := cr.NextBatch(smallSize)
1092 ps.NoError(err)
1093 defer chunked.Release()
1094
1095 ps.Len(chunked.Chunks(), 1)
1096 ps.NotNil(chunked.Chunk(0))
1097
1098 ps.True(array.Equal(values, chunked.Chunk(0)))
1099 }
1100
1101 var fullTypeList = []arrow.DataType{
1102 arrow.FixedWidthTypes.Boolean,
1103 arrow.PrimitiveTypes.Uint8,
1104 arrow.PrimitiveTypes.Int8,
1105 arrow.PrimitiveTypes.Uint16,
1106 arrow.PrimitiveTypes.Int16,
1107 arrow.PrimitiveTypes.Uint32,
1108 arrow.PrimitiveTypes.Int32,
1109 arrow.PrimitiveTypes.Uint64,
1110 arrow.PrimitiveTypes.Int64,
1111 arrow.FixedWidthTypes.Date32,
1112 arrow.PrimitiveTypes.Float32,
1113 arrow.PrimitiveTypes.Float64,
1114 arrow.FixedWidthTypes.Float16,
1115 arrow.BinaryTypes.String,
1116 arrow.BinaryTypes.Binary,
1117 &arrow.FixedSizeBinaryType{ByteWidth: 10},
1118 &arrow.Decimal128Type{Precision: 1, Scale: 0},
1119 &arrow.Decimal128Type{Precision: 5, Scale: 4},
1120 &arrow.Decimal128Type{Precision: 10, Scale: 9},
1121 &arrow.Decimal128Type{Precision: 19, Scale: 18},
1122 &arrow.Decimal128Type{Precision: 23, Scale: 22},
1123 &arrow.Decimal128Type{Precision: 27, Scale: 26},
1124 &arrow.Decimal128Type{Precision: 38, Scale: 37},
1125 }
1126
1127 func (ps *ParquetIOTestSuite) TestSingleColumnRequiredWrite() {
1128 for _, dt := range fullTypeList {
1129 ps.Run(dt.Name(), func() {
1130 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1131 defer mem.AssertSize(ps.T(), 0)
1132
1133 values := testutils.RandomNonNull(mem, dt, smallSize)
1134 defer values.Release()
1135 sc := ps.makeSimpleSchema(dt, parquet.Repetitions.Required)
1136 data := ps.writeColumn(mem, sc, values)
1137 ps.readAndCheckSingleColumnFile(mem, data, values)
1138 })
1139 }
1140 }
1141
1142 func (ps *ParquetIOTestSuite) roundTripTable(mem memory.Allocator, expected arrow.Table, storeSchema bool) {
1143 var buf bytes.Buffer
1144 var props pqarrow.ArrowWriterProperties
1145 if storeSchema {
1146 props = pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema(), pqarrow.WithAllocator(mem))
1147 } else {
1148 props = pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))
1149 }
1150
1151 writeProps := parquet.NewWriterProperties(parquet.WithAllocator(mem))
1152 ps.Require().NoError(pqarrow.WriteTable(expected, &buf, expected.NumRows(), writeProps, props))
1153
1154 reader := ps.createReader(mem, buf.Bytes())
1155 defer reader.ParquetReader().Close()
1156
1157 tbl := ps.readTable(reader)
1158 defer tbl.Release()
1159
1160 ps.Equal(expected.NumCols(), tbl.NumCols())
1161 ps.Equal(expected.NumRows(), tbl.NumRows())
1162
1163 exChunk := expected.Column(0).Data()
1164 tblChunk := tbl.Column(0).Data()
1165
1166 ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
1167 exc := exChunk.Chunk(0)
1168 tbc := tblChunk.Chunk(0)
1169 ps.Truef(array.ApproxEqual(exc, tbc), "expected: %T %s\ngot: %T %s", exc, exc, tbc, tbc)
1170 }
1171
1172 func makeEmptyListsArray(size int) arrow.Array {
1173
1174 offsetsNbytes := arrow.Int32Traits.BytesRequired(size + 1)
1175 offsetsBuffer := make([]byte, offsetsNbytes)
1176
1177 childBuffers := []*memory.Buffer{nil, nil}
1178 childData := array.NewData(arrow.PrimitiveTypes.Float32, 0, childBuffers, nil, 0, 0)
1179 defer childData.Release()
1180 buffers := []*memory.Buffer{nil, memory.NewBufferBytes(offsetsBuffer)}
1181 arrayData := array.NewData(arrow.ListOf(childData.DataType()), size, buffers, []arrow.ArrayData{childData}, 0, 0)
1182 defer arrayData.Release()
1183 return array.MakeFromData(arrayData)
1184 }
1185
1186 func makeListArray(values arrow.Array, size, nullcount int) arrow.Array {
1187 nonNullEntries := size - nullcount - 1
1188 lengthPerEntry := values.Len() / nonNullEntries
1189
1190 offsets := make([]byte, arrow.Int32Traits.BytesRequired(size+1))
1191 offsetsArr := arrow.Int32Traits.CastFromBytes(offsets)
1192
1193 nullBitmap := make([]byte, int(bitutil.BytesForBits(int64(size))))
1194
1195 curOffset := 0
1196 for i := 0; i < size; i++ {
1197 offsetsArr[i] = int32(curOffset)
1198 if !(((i % 2) == 0) && ((i / 2) < nullcount)) {
1199
1200 bitutil.SetBit(nullBitmap, i)
1201 if i != 1 {
1202 curOffset += lengthPerEntry
1203 }
1204 }
1205 }
1206 offsetsArr[size] = int32(values.Len())
1207
1208 listData := array.NewData(arrow.ListOf(values.DataType()), size,
1209 []*memory.Buffer{memory.NewBufferBytes(nullBitmap), memory.NewBufferBytes(offsets)},
1210 []arrow.ArrayData{values.Data()}, nullcount, 0)
1211 defer listData.Release()
1212 return array.NewListData(listData)
1213 }
1214
1215 func prepareEmptyListsTable(size int) arrow.Table {
1216 lists := makeEmptyListsArray(size)
1217 defer lists.Release()
1218 chunked := arrow.NewChunked(lists.DataType(), []arrow.Array{lists})
1219 defer chunked.Release()
1220 return makeSimpleTable(chunked, true)
1221 }
1222
1223 func prepareListTable(dt arrow.DataType, size int, nullableLists bool, nullableElems bool, nullCount int) arrow.Table {
1224 nc := nullCount
1225 if !nullableElems {
1226 nc = 0
1227 }
1228 values := testutils.RandomNullable(dt, size*size, nc)
1229 defer values.Release()
1230
1231 values = array.NewSlice(values, 5, int64(values.Len()))
1232 defer values.Release()
1233
1234 if !nullableLists {
1235 nullCount = 0
1236 }
1237 lists := makeListArray(values, size, nullCount)
1238 defer lists.Release()
1239
1240 chunked := arrow.NewChunked(lists.DataType(), []arrow.Array{lists})
1241 defer chunked.Release()
1242
1243 return makeSimpleTable(array.NewChunkedSlice(chunked, 3, int64(size)), nullableLists)
1244 }
1245
1246 func prepareListOfListTable(dt arrow.DataType, size, nullCount int, nullableParentLists, nullableLists, nullableElems bool) arrow.Table {
1247 nc := nullCount
1248 if !nullableElems {
1249 nc = 0
1250 }
1251
1252 values := testutils.RandomNullable(dt, size*6, nc)
1253 defer values.Release()
1254
1255 if nullableLists {
1256 nc = nullCount
1257 } else {
1258 nc = 0
1259 }
1260
1261 lists := makeListArray(values, size*3, nc)
1262 defer lists.Release()
1263
1264 if !nullableParentLists {
1265 nullCount = 0
1266 }
1267
1268 parentLists := makeListArray(lists, size, nullCount)
1269 defer parentLists.Release()
1270
1271 chunked := arrow.NewChunked(parentLists.DataType(), []arrow.Array{parentLists})
1272 defer chunked.Release()
1273
1274 return makeSimpleTable(chunked, nullableParentLists)
1275 }
1276
1277 func (ps *ParquetIOTestSuite) TestSingleEmptyListsColumnReadWrite() {
1278 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1279 defer mem.AssertSize(ps.T(), 0)
1280
1281 expected := prepareEmptyListsTable(smallSize)
1282 defer expected.Release()
1283 buf := writeTableToBuffer(ps.T(), mem, expected, smallSize, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
1284 defer buf.Release()
1285
1286 reader := ps.createReader(mem, buf.Bytes())
1287 tbl := ps.readTable(reader)
1288 defer tbl.Release()
1289
1290 ps.EqualValues(expected.NumCols(), tbl.NumCols())
1291 ps.EqualValues(expected.NumRows(), tbl.NumRows())
1292
1293 exChunk := expected.Column(0).Data()
1294 tblChunk := tbl.Column(0).Data()
1295
1296 ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
1297 ps.True(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)))
1298 }
1299
1300 func (ps *ParquetIOTestSuite) TestSingleColumnOptionalReadWrite() {
1301 for _, dt := range fullTypeList {
1302 ps.Run(dt.Name(), func() {
1303 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1304 defer mem.AssertSize(ps.T(), 0)
1305
1306 values := testutils.RandomNullable(dt, smallSize, 10)
1307 defer values.Release()
1308 sc := ps.makeSimpleSchema(dt, parquet.Repetitions.Optional)
1309 data := ps.writeColumn(mem, sc, values)
1310 ps.readAndCheckSingleColumnFile(mem, data, values)
1311 })
1312 }
1313 }
1314
1315 func (ps *ParquetIOTestSuite) TestSingleNullableListNullableColumnReadWrite() {
1316 for _, dt := range fullTypeList {
1317 ps.Run(dt.Name(), func() {
1318 expected := prepareListTable(dt, smallSize, true, true, 10)
1319 defer expected.Release()
1320 ps.roundTripTable(memory.DefaultAllocator, expected, false)
1321 })
1322 }
1323 }
1324
1325 func (ps *ParquetIOTestSuite) TestSingleRequiredListNullableColumnReadWrite() {
1326 for _, dt := range fullTypeList {
1327 ps.Run(dt.Name(), func() {
1328 expected := prepareListTable(dt, smallSize, false, true, 10)
1329 defer expected.Release()
1330 ps.roundTripTable(memory.DefaultAllocator, expected, false)
1331 })
1332 }
1333 }
1334
1335 func (ps *ParquetIOTestSuite) TestSingleNullableListRequiredColumnReadWrite() {
1336 for _, dt := range fullTypeList {
1337 ps.Run(dt.Name(), func() {
1338 expected := prepareListTable(dt, smallSize, true, false, 10)
1339 defer expected.Release()
1340 ps.roundTripTable(memory.DefaultAllocator, expected, false)
1341 })
1342 }
1343 }
1344
1345 func (ps *ParquetIOTestSuite) TestSingleRequiredListRequiredColumnReadWrite() {
1346 for _, dt := range fullTypeList {
1347 ps.Run(dt.Name(), func() {
1348 expected := prepareListTable(dt, smallSize, false, false, 0)
1349 defer expected.Release()
1350 ps.roundTripTable(memory.DefaultAllocator, expected, false)
1351 })
1352 }
1353 }
1354
1355 func (ps *ParquetIOTestSuite) TestSingleNullableListRequiredListRequiredColumnReadWrite() {
1356 for _, dt := range fullTypeList {
1357 ps.Run(dt.Name(), func() {
1358 expected := prepareListOfListTable(dt, smallSize, 2, true, false, false)
1359 defer expected.Release()
1360 ps.roundTripTable(memory.DefaultAllocator, expected, false)
1361 })
1362 }
1363 }
1364
1365 func (ps *ParquetIOTestSuite) TestSimpleStruct() {
1366 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1367 defer mem.AssertSize(ps.T(), 0)
1368
1369 links := arrow.StructOf(arrow.Field{Name: "Backward", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
1370 arrow.Field{Name: "Forward", Type: arrow.PrimitiveTypes.Int64, Nullable: true})
1371
1372 bldr := array.NewStructBuilder(mem, links)
1373 defer bldr.Release()
1374
1375 backBldr := bldr.FieldBuilder(0).(*array.Int64Builder)
1376 forwardBldr := bldr.FieldBuilder(1).(*array.Int64Builder)
1377
1378 bldr.Append(true)
1379 backBldr.AppendNull()
1380 forwardBldr.Append(20)
1381
1382 bldr.Append(true)
1383 backBldr.Append(10)
1384 forwardBldr.Append(40)
1385
1386 data := bldr.NewArray()
1387 defer data.Release()
1388
1389 tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links", Type: links}}, nil),
1390 []arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links", Type: links}, arrow.NewChunked(links, []arrow.Array{data}))}, -1)
1391 defer data.Release()
1392 defer tbl.Release()
1393
1394 ps.roundTripTable(mem, tbl, false)
1395 }
1396
1397 func (ps *ParquetIOTestSuite) TestSingleColumnNullableStruct() {
1398 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1399 defer mem.AssertSize(ps.T(), 0)
1400
1401 links := arrow.StructOf(arrow.Field{Name: "Backward", Type: arrow.PrimitiveTypes.Int64, Nullable: true})
1402 bldr := array.NewStructBuilder(mem, links)
1403 defer bldr.Release()
1404
1405 backBldr := bldr.FieldBuilder(0).(*array.Int64Builder)
1406
1407 bldr.AppendNull()
1408 bldr.Append(true)
1409 backBldr.Append(10)
1410
1411 data := bldr.NewArray()
1412 defer data.Release()
1413
1414 tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links", Type: links, Nullable: true}}, nil),
1415 []arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links", Type: links, Nullable: true}, arrow.NewChunked(links, []arrow.Array{data}))}, -1)
1416 defer data.Release()
1417 defer tbl.Release()
1418
1419 ps.roundTripTable(mem, tbl, false)
1420 }
1421
1422 func (ps *ParquetIOTestSuite) TestNestedRequiredFieldStruct() {
1423 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1424 defer mem.AssertSize(ps.T(), 0)
1425
1426 intField := arrow.Field{Name: "int_array", Type: arrow.PrimitiveTypes.Int32}
1427 intBldr := array.NewInt32Builder(mem)
1428 defer intBldr.Release()
1429 intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, nil)
1430
1431 intArr := intBldr.NewArray()
1432 defer intArr.Release()
1433
1434 validity := memory.NewBufferBytes([]byte{0xCC})
1435 defer validity.Release()
1436
1437 structField := arrow.Field{Name: "root", Type: arrow.StructOf(intField), Nullable: true}
1438 structData := array.NewData(structField.Type, 8, []*memory.Buffer{validity}, []arrow.ArrayData{intArr.Data()}, 4, 0)
1439 defer structData.Release()
1440 stData := array.NewStructData(structData)
1441 defer stData.Release()
1442
1443 tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil),
1444 []arrow.Column{*arrow.NewColumn(structField,
1445 arrow.NewChunked(structField.Type, []arrow.Array{stData}))}, -1)
1446 defer stData.Release()
1447 defer tbl.Release()
1448
1449 ps.roundTripTable(mem, tbl, false)
1450 }
1451
1452 func (ps *ParquetIOTestSuite) TestNestedNullableField() {
1453 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1454 defer mem.AssertSize(ps.T(), 0)
1455
1456 intField := arrow.Field{Name: "int_array", Type: arrow.PrimitiveTypes.Int32, Nullable: true}
1457 intBldr := array.NewInt32Builder(mem)
1458 defer intBldr.Release()
1459 intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, []bool{true, false, true, false, true, true, false, true})
1460
1461 intArr := intBldr.NewArray()
1462 defer intArr.Release()
1463
1464 validity := memory.NewBufferBytes([]byte{0xCC})
1465 defer validity.Release()
1466
1467 structField := arrow.Field{Name: "root", Type: arrow.StructOf(intField), Nullable: true}
1468 data := array.NewData(structField.Type, 8, []*memory.Buffer{validity}, []arrow.ArrayData{intArr.Data()}, 4, 0)
1469 defer data.Release()
1470 stData := array.NewStructData(data)
1471 defer stData.Release()
1472
1473 tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil),
1474 []arrow.Column{*arrow.NewColumn(structField,
1475 arrow.NewChunked(structField.Type, []arrow.Array{stData}))}, -1)
1476 defer stData.Release()
1477 defer tbl.Release()
1478
1479 ps.roundTripTable(mem, tbl, false)
1480 }
1481
1482 func (ps *ParquetIOTestSuite) TestNestedEmptyList() {
1483 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1484 defer mem.AssertSize(ps.T(), 0)
1485
1486 bldr := array.NewStructBuilder(mem, arrow.StructOf(
1487 arrow.Field{
1488 Name: "root",
1489 Type: arrow.StructOf(
1490 arrow.Field{
1491 Name: "child1",
1492 Type: arrow.ListOf(arrow.StructOf(
1493 arrow.Field{
1494 Name: "child2",
1495 Type: arrow.ListOf(arrow.StructOf(
1496 arrow.Field{
1497 Name: "name",
1498 Type: arrow.BinaryTypes.String,
1499 },
1500 )),
1501 },
1502 )),
1503 },
1504 ),
1505 },
1506 ))
1507 defer bldr.Release()
1508
1509 rootBldr := bldr.FieldBuilder(0).(*array.StructBuilder)
1510 child1Bldr := rootBldr.FieldBuilder(0).(*array.ListBuilder)
1511 child1ElBldr := child1Bldr.ValueBuilder().(*array.StructBuilder)
1512 child2Bldr := child1ElBldr.FieldBuilder(0).(*array.ListBuilder)
1513 leafBldr := child2Bldr.ValueBuilder().(*array.StructBuilder)
1514 nameBldr := leafBldr.FieldBuilder(0).(*array.StringBuilder)
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526 for i := 0; i < 8; i++ {
1527 bldr.Append(true)
1528 rootBldr.Append(true)
1529 child1Bldr.Append(true)
1530
1531 child1ElBldr.Append(true)
1532 child2Bldr.Append(true)
1533 leafBldr.Append(true)
1534 nameBldr.Append("foo")
1535
1536 child1ElBldr.Append(true)
1537 child2Bldr.Append(true)
1538 }
1539
1540 arr := bldr.NewArray()
1541 defer arr.Release()
1542
1543 field := arrow.Field{Name: "x", Type: arr.DataType(), Nullable: true}
1544 expected := array.NewTableFromSlice(arrow.NewSchema([]arrow.Field{field}, nil), [][]arrow.Array{{arr}})
1545 defer expected.Release()
1546
1547 ps.roundTripTable(mem, expected, false)
1548 }
1549
1550 func (ps *ParquetIOTestSuite) TestCanonicalNestedRoundTrip() {
1551 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1552 defer mem.AssertSize(ps.T(), 0)
1553
1554 docIdField := arrow.Field{Name: "DocID", Type: arrow.PrimitiveTypes.Int64}
1555 linksField := arrow.Field{Name: "Links", Type: arrow.StructOf(
1556 arrow.Field{Name: "Backward", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)},
1557 arrow.Field{Name: "Forward", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)},
1558 ), Nullable: true}
1559
1560 nameStruct := arrow.StructOf(
1561 arrow.Field{Name: "Language", Nullable: true, Type: arrow.ListOf(
1562 arrow.StructOf(arrow.Field{Name: "Code", Type: arrow.BinaryTypes.String},
1563 arrow.Field{Name: "Country", Type: arrow.BinaryTypes.String, Nullable: true}))},
1564 arrow.Field{Name: "Url", Type: arrow.BinaryTypes.String, Nullable: true})
1565
1566 nameField := arrow.Field{Name: "Name", Type: arrow.ListOf(nameStruct)}
1567 sc := arrow.NewSchema([]arrow.Field{docIdField, linksField, nameField}, nil)
1568
1569 docIDArr, _, err := array.FromJSON(mem, docIdField.Type, strings.NewReader("[10, 20]"))
1570 ps.Require().NoError(err)
1571 defer docIDArr.Release()
1572
1573 linksIDArr, _, err := array.FromJSON(mem, linksField.Type, strings.NewReader(`[{"Backward":[], "Forward":[20, 40, 60]}, {"Backward":[10, 30], "Forward": [80]}]`))
1574 ps.Require().NoError(err)
1575 defer linksIDArr.Release()
1576
1577 nameArr, _, err := array.FromJSON(mem, nameField.Type, strings.NewReader(`
1578 [[{"Language": [{"Code": "en_us", "Country": "us"},
1579 {"Code": "en_us", "Country": null}],
1580 "Url": "http://A"},
1581 {"Url": "http://B", "Language": null},
1582 {"Language": [{"Code": "en-gb", "Country": "gb"}], "Url": null}],
1583 [{"Url": "http://C", "Language": null}]]`))
1584 ps.Require().NoError(err)
1585 defer nameArr.Release()
1586
1587 expected := array.NewTable(sc, []arrow.Column{
1588 *arrow.NewColumn(docIdField, arrow.NewChunked(docIdField.Type, []arrow.Array{docIDArr})),
1589 *arrow.NewColumn(linksField, arrow.NewChunked(linksField.Type, []arrow.Array{linksIDArr})),
1590 *arrow.NewColumn(nameField, arrow.NewChunked(nameField.Type, []arrow.Array{nameArr})),
1591 }, 2)
1592 defer docIDArr.Release()
1593 defer linksIDArr.Release()
1594 defer nameArr.Release()
1595 defer expected.Release()
1596
1597 ps.roundTripTable(mem, expected, false)
1598 }
1599
1600 func (ps *ParquetIOTestSuite) TestFixedSizeList() {
1601 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1602 defer mem.AssertSize(ps.T(), 0)
1603
1604 bldr := array.NewFixedSizeListBuilder(mem, 3, arrow.PrimitiveTypes.Int16)
1605 defer bldr.Release()
1606
1607 vb := bldr.ValueBuilder().(*array.Int16Builder)
1608
1609 bldr.AppendValues([]bool{true, true, true})
1610 vb.AppendValues([]int16{1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
1611
1612 data := bldr.NewArray()
1613 defer data.Release()
1614
1615 field := arrow.Field{Name: "root", Type: data.DataType(), Nullable: true}
1616 cnk := arrow.NewChunked(field.Type, []arrow.Array{data})
1617 defer data.Release()
1618
1619 tbl := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil), []arrow.Column{*arrow.NewColumn(field, cnk)}, -1)
1620 defer cnk.Release()
1621 defer tbl.Release()
1622
1623 ps.roundTripTable(mem, tbl, true)
1624 }
1625
1626 func (ps *ParquetIOTestSuite) TestNull() {
1627 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1628 defer mem.AssertSize(ps.T(), 0)
1629
1630 bldr := array.NewNullBuilder(mem)
1631 defer bldr.Release()
1632
1633 bldr.AppendNull()
1634 bldr.AppendNull()
1635 bldr.AppendNull()
1636
1637 data := bldr.NewArray()
1638 defer data.Release()
1639
1640 field := arrow.Field{Name: "x", Type: data.DataType(), Nullable: true}
1641 expected := array.NewTable(
1642 arrow.NewSchema([]arrow.Field{field}, nil),
1643 []arrow.Column{*arrow.NewColumn(field, arrow.NewChunked(field.Type, []arrow.Array{data}))},
1644 -1,
1645 )
1646
1647 ps.roundTripTable(mem, expected, true)
1648 }
1649
1650
1651 func (ps *ParquetIOTestSuite) TestNullableListOfStruct() {
1652 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1653 defer mem.AssertSize(ps.T(), 0)
1654
1655 bldr := array.NewListBuilder(mem, arrow.StructOf(
1656 arrow.Field{Name: "a", Type: arrow.PrimitiveTypes.Int32},
1657 arrow.Field{Name: "b", Type: arrow.BinaryTypes.String},
1658 ))
1659 defer bldr.Release()
1660
1661 stBldr := bldr.ValueBuilder().(*array.StructBuilder)
1662 aBldr := stBldr.FieldBuilder(0).(*array.Int32Builder)
1663 bBldr := stBldr.FieldBuilder(1).(*array.StringBuilder)
1664
1665 for i := 0; i < 320; i++ {
1666 if i%5 == 0 {
1667 bldr.AppendNull()
1668 continue
1669 }
1670 bldr.Append(true)
1671 for j := 0; j < 4; j++ {
1672 stBldr.Append(true)
1673 aBldr.Append(int32(i + j))
1674 bBldr.Append(strconv.Itoa(i + j))
1675 }
1676 }
1677
1678 arr := bldr.NewArray()
1679 defer arr.Release()
1680
1681 field := arrow.Field{Name: "x", Type: arr.DataType(), Nullable: true}
1682 expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
1683 []arrow.Column{*arrow.NewColumn(field, arrow.NewChunked(field.Type, []arrow.Array{arr}))}, -1)
1684 defer arr.Release()
1685 defer expected.Release()
1686
1687 ps.roundTripTable(mem, expected, false)
1688 }
1689
1690 func (ps *ParquetIOTestSuite) TestStructWithListOfNestedStructs() {
1691 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1692 defer mem.AssertSize(ps.T(), 0)
1693
1694 bldr := array.NewStructBuilder(mem, arrow.StructOf(
1695 arrow.Field{
1696 Nullable: true,
1697 Name: "l",
1698 Type: arrow.ListOf(arrow.StructOf(
1699 arrow.Field{
1700 Nullable: true,
1701 Name: "a",
1702 Type: arrow.StructOf(
1703 arrow.Field{
1704 Nullable: true,
1705 Name: "b",
1706 Type: arrow.BinaryTypes.String,
1707 },
1708 ),
1709 },
1710 )),
1711 },
1712 ))
1713 defer bldr.Release()
1714
1715 lBldr := bldr.FieldBuilder(0).(*array.ListBuilder)
1716 stBldr := lBldr.ValueBuilder().(*array.StructBuilder)
1717 aBldr := stBldr.FieldBuilder(0).(*array.StructBuilder)
1718 bBldr := aBldr.FieldBuilder(0).(*array.StringBuilder)
1719
1720 bldr.AppendNull()
1721 bldr.Append(true)
1722 lBldr.Append(true)
1723 for i := 0; i < 8; i++ {
1724 stBldr.Append(true)
1725 aBldr.Append(true)
1726 bBldr.Append(strconv.Itoa(i))
1727 }
1728
1729 arr := bldr.NewArray()
1730 defer arr.Release()
1731
1732 field := arrow.Field{Name: "x", Type: arr.DataType(), Nullable: true}
1733 expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
1734 []arrow.Column{*arrow.NewColumn(field, arrow.NewChunked(field.Type, []arrow.Array{arr}))}, -1)
1735 defer arr.Release()
1736 defer expected.Release()
1737
1738 ps.roundTripTable(mem, expected, false)
1739 }
1740
1741 func TestParquetArrowIO(t *testing.T) {
1742 suite.Run(t, new(ParquetIOTestSuite))
1743 }
1744
1745 func TestBufferedRecWrite(t *testing.T) {
1746 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1747 defer mem.AssertSize(t, 0)
1748
1749 sc := arrow.NewSchema([]arrow.Field{
1750 {Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable: true},
1751 {Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
1752 {Name: "struct_i64_f64", Type: arrow.StructOf(
1753 arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
1754 arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64, Nullable: true})},
1755 }, nil)
1756
1757 structData := array.NewData(sc.Field(2).Type, SIZELEN,
1758 []*memory.Buffer{nil, nil},
1759 []arrow.ArrayData{testutils.RandomNullable(arrow.PrimitiveTypes.Int64, SIZELEN, 0).Data(), testutils.RandomNullable(arrow.PrimitiveTypes.Float64, SIZELEN, 0).Data()}, 0, 0)
1760 defer structData.Release()
1761 cols := []arrow.Array{
1762 testutils.RandomNullable(sc.Field(0).Type, SIZELEN, SIZELEN/5),
1763 testutils.RandomNullable(sc.Field(1).Type, SIZELEN, SIZELEN/5),
1764 array.NewStructData(structData),
1765 }
1766
1767 rec := array.NewRecord(sc, cols, SIZELEN)
1768 defer rec.Release()
1769
1770 var (
1771 buf bytes.Buffer
1772 )
1773
1774 wr, err := pqarrow.NewFileWriter(sc, &buf,
1775 parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy), parquet.WithDictionaryDefault(false), parquet.WithDataPageSize(100*1024)),
1776 pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
1777 require.NoError(t, err)
1778
1779 p1 := rec.NewSlice(0, SIZELEN/2)
1780 defer p1.Release()
1781 require.NoError(t, wr.WriteBuffered(p1))
1782
1783 p2 := rec.NewSlice(SIZELEN/2, SIZELEN)
1784 defer p2.Release()
1785 require.NoError(t, wr.WriteBuffered(p2))
1786
1787 wr.Close()
1788
1789 rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
1790 assert.NoError(t, err)
1791
1792 assert.EqualValues(t, 1, rdr.NumRowGroups())
1793 assert.EqualValues(t, SIZELEN, rdr.NumRows())
1794 rdr.Close()
1795
1796 tbl, err := pqarrow.ReadTable(context.Background(), bytes.NewReader(buf.Bytes()), nil, pqarrow.ArrowReadProperties{}, nil)
1797 assert.NoError(t, err)
1798 defer tbl.Release()
1799
1800 assert.EqualValues(t, SIZELEN, tbl.NumRows())
1801 }
1802
1803 func (ps *ParquetIOTestSuite) TestArrowMapTypeRoundTrip() {
1804 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1805 defer mem.AssertSize(ps.T(), 0)
1806
1807 bldr := array.NewMapBuilder(mem, arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int32, false)
1808 defer bldr.Release()
1809
1810 kb := bldr.KeyBuilder().(*array.StringBuilder)
1811 ib := bldr.ItemBuilder().(*array.Int32Builder)
1812
1813 bldr.Append(true)
1814 kb.AppendValues([]string{"Fee", "Fi", "Fo", "Fum"}, nil)
1815 ib.AppendValues([]int32{1, 2, 3, 4}, nil)
1816
1817 bldr.Append(true)
1818 kb.AppendValues([]string{"Fee", "Fi", "Fo"}, nil)
1819 ib.AppendValues([]int32{5, 4, 3}, nil)
1820
1821 bldr.AppendNull()
1822
1823 bldr.Append(true)
1824 kb.AppendValues([]string{"Fo", "Fi", "Fee"}, nil)
1825 ib.AppendValues([]int32{-1, 2, 3}, []bool{false, true, true})
1826
1827 arr := bldr.NewArray()
1828 defer arr.Release()
1829
1830 fld := arrow.Field{Name: "mapped", Type: arr.DataType(), Nullable: true}
1831 cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
1832 defer arr.Release()
1833 tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
1834 defer cnk.Release()
1835 defer tbl.Release()
1836
1837 ps.roundTripTable(mem, tbl, true)
1838 }
1839
1840 func (ps *ParquetIOTestSuite) TestArrowExtensionTypeRoundTrip() {
1841 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1842 defer mem.AssertSize(ps.T(), 0)
1843
1844 extBuilder := array.NewExtensionBuilder(mem, types.NewUUIDType())
1845 defer extBuilder.Release()
1846 builder := types.NewUUIDBuilder(extBuilder)
1847 builder.Append(uuid.New())
1848 arr := builder.NewArray()
1849 defer arr.Release()
1850
1851 fld := arrow.Field{Name: "uuid", Type: arr.DataType(), Nullable: true}
1852 cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
1853 defer arr.Release()
1854 tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
1855 defer cnk.Release()
1856 defer tbl.Release()
1857
1858 ps.roundTripTable(mem, tbl, true)
1859 }
1860
1861 func (ps *ParquetIOTestSuite) TestArrowUnknownExtensionTypeRoundTrip() {
1862 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1863 defer mem.AssertSize(ps.T(), 0)
1864
1865 var written, expected arrow.Table
1866
1867 {
1868
1869 extType := types.NewUUIDType()
1870 bldr := array.NewExtensionBuilder(mem, extType)
1871 defer bldr.Release()
1872
1873 bldr.Builder.(*array.FixedSizeBinaryBuilder).AppendValues(
1874 [][]byte{nil, []byte("abcdefghijklmno0"), []byte("abcdefghijklmno1"), []byte("abcdefghijklmno2")},
1875 []bool{false, true, true, true})
1876
1877 arr := bldr.NewArray()
1878 defer arr.Release()
1879
1880 if arrow.GetExtensionType("uuid") != nil {
1881 ps.NoError(arrow.UnregisterExtensionType("uuid"))
1882 }
1883
1884 fld := arrow.Field{Name: "uuid", Type: arr.DataType(), Nullable: true}
1885 cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
1886 defer arr.Release()
1887 written = array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
1888 defer cnk.Release()
1889 defer written.Release()
1890 }
1891
1892 {
1893
1894 bldr := array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: 16})
1895 defer bldr.Release()
1896 bldr.AppendValues(
1897 [][]byte{nil, []byte("abcdefghijklmno0"), []byte("abcdefghijklmno1"), []byte("abcdefghijklmno2")},
1898 []bool{false, true, true, true})
1899
1900 arr := bldr.NewArray()
1901 defer arr.Release()
1902
1903 fld := arrow.Field{Name: "uuid", Type: arr.DataType(), Nullable: true}
1904 cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
1905 defer arr.Release()
1906 expected = array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
1907 defer cnk.Release()
1908 defer expected.Release()
1909 }
1910
1911
1912 ps.Equal(expected.NumCols(), written.NumCols())
1913 ps.Equal(expected.NumRows(), written.NumRows())
1914
1915
1916 var buf bytes.Buffer
1917 props := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema(), pqarrow.WithAllocator(mem))
1918
1919 writeProps := parquet.NewWriterProperties(parquet.WithAllocator(mem))
1920 ps.Require().NoError(pqarrow.WriteTable(written, &buf, written.NumRows(), writeProps, props))
1921
1922 reader := ps.createReader(mem, buf.Bytes())
1923 defer reader.ParquetReader().Close()
1924
1925 tbl := ps.readTable(reader)
1926 defer tbl.Release()
1927
1928 ps.Equal(expected.NumCols(), tbl.NumCols())
1929 ps.Equal(expected.NumRows(), tbl.NumRows())
1930
1931 exChunk := expected.Column(0).Data()
1932 tblChunk := tbl.Column(0).Data()
1933
1934 ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
1935 exc := exChunk.Chunk(0)
1936 tbc := tblChunk.Chunk(0)
1937 ps.Truef(array.Equal(exc, tbc), "expected: %T %s\ngot: %T %s", exc, exc, tbc, tbc)
1938
1939 expectedMd := arrow.MetadataFrom(map[string]string{
1940 ipc.ExtensionTypeKeyName: "uuid",
1941 ipc.ExtensionMetadataKeyName: "uuid-serialized",
1942 "PARQUET:field_id": "-1",
1943 })
1944 ps.Truef(expectedMd.Equal(tbl.Column(0).Field().Metadata), "expected: %v\ngot: %v", expectedMd, tbl.Column(0).Field().Metadata)
1945 }
1946
1947 func TestWriteTableMemoryAllocation(t *testing.T) {
1948 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1949 sc := arrow.NewSchema([]arrow.Field{
1950 {Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable: true},
1951 {Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
1952 {Name: "struct_i64_f64", Type: arrow.StructOf(
1953 arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
1954 arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64, Nullable: true})},
1955 {Name: "arr_i64", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)},
1956 {Name: "uuid", Type: types.NewUUIDType(), Nullable: true},
1957 }, nil)
1958
1959 bld := array.NewRecordBuilder(mem, sc)
1960 bld.Field(0).(*array.Float32Builder).Append(1.0)
1961 bld.Field(1).(*array.Int32Builder).Append(1)
1962 sbld := bld.Field(2).(*array.StructBuilder)
1963 sbld.Append(true)
1964 sbld.FieldBuilder(0).(*array.Int64Builder).Append(1)
1965 sbld.FieldBuilder(1).(*array.Float64Builder).Append(1.0)
1966 abld := bld.Field(3).(*array.ListBuilder)
1967 abld.Append(true)
1968 abld.ValueBuilder().(*array.Int64Builder).Append(2)
1969 bld.Field(4).(*types.UUIDBuilder).Append(uuid.MustParse("00000000-0000-0000-0000-000000000001"))
1970
1971 rec := bld.NewRecord()
1972 bld.Release()
1973
1974 var buf bytes.Buffer
1975 wr, err := pqarrow.NewFileWriter(sc, &buf,
1976 parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy)),
1977 pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
1978 require.NoError(t, err)
1979
1980 require.NoError(t, wr.Write(rec))
1981 rec.Release()
1982 wr.Close()
1983
1984 require.Zero(t, mem.CurrentAlloc())
1985 }
1986
View as plain text