1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pqarrow_test
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "os"
25 "path/filepath"
26 "strings"
27 "testing"
28
29 "github.com/apache/arrow/go/v15/arrow"
30 "github.com/apache/arrow/go/v15/arrow/array"
31 "github.com/apache/arrow/go/v15/arrow/decimal128"
32 "github.com/apache/arrow/go/v15/arrow/float16"
33 "github.com/apache/arrow/go/v15/arrow/memory"
34 "github.com/apache/arrow/go/v15/parquet"
35 "github.com/apache/arrow/go/v15/parquet/file"
36 "github.com/apache/arrow/go/v15/parquet/pqarrow"
37 "github.com/stretchr/testify/assert"
38 "github.com/stretchr/testify/require"
39 )
40
41 func getDataDir() string {
42 datadir := os.Getenv("PARQUET_TEST_DATA")
43 if datadir == "" {
44 panic("please point PARQUET_TEST_DATA env var to the test data directory")
45 }
46 return datadir
47 }
48
49 func TestArrowReaderAdHocReadDecimals(t *testing.T) {
50 tests := []struct {
51 file string
52 typ *arrow.Decimal128Type
53 }{
54 {"int32_decimal", &arrow.Decimal128Type{Precision: 4, Scale: 2}},
55 {"int64_decimal", &arrow.Decimal128Type{Precision: 10, Scale: 2}},
56 {"fixed_length_decimal", &arrow.Decimal128Type{Precision: 25, Scale: 2}},
57 {"fixed_length_decimal_legacy", &arrow.Decimal128Type{Precision: 13, Scale: 2}},
58 {"byte_array_decimal", &arrow.Decimal128Type{Precision: 4, Scale: 2}},
59 }
60
61 dataDir := getDataDir()
62 for _, tt := range tests {
63 t.Run(tt.file, func(t *testing.T) {
64 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
65 defer mem.AssertSize(t, 0)
66
67 filename := filepath.Join(dataDir, tt.file+".parquet")
68 require.FileExists(t, filename)
69
70 rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
71 require.NoError(t, err)
72 defer rdr.Close()
73 arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
74 require.NoError(t, err)
75
76 tbl, err := arrowRdr.ReadTable(context.Background())
77 require.NoError(t, err)
78 defer tbl.Release()
79
80 assert.EqualValues(t, 1, tbl.NumCols())
81 assert.Truef(t, arrow.TypeEqual(tbl.Schema().Field(0).Type, tt.typ), "expected: %s\ngot: %s", tbl.Schema().Field(0).Type, tt.typ)
82
83 const expectedLen = 24
84 valCol := tbl.Column(0)
85
86 assert.EqualValues(t, expectedLen, valCol.Len())
87 assert.Len(t, valCol.Data().Chunks(), 1)
88
89 chunk := valCol.Data().Chunk(0)
90 bldr := array.NewDecimal128Builder(mem, tt.typ)
91 defer bldr.Release()
92 for i := 0; i < expectedLen; i++ {
93 bldr.Append(decimal128.FromI64(int64((i + 1) * 100)))
94 }
95
96 expectedArr := bldr.NewDecimal128Array()
97 defer expectedArr.Release()
98
99 assert.Truef(t, array.Equal(expectedArr, chunk), "expected: %s\ngot: %s", expectedArr, chunk)
100 })
101 }
102 }
103
104 func TestArrowReaderAdHocReadFloat16s(t *testing.T) {
105 tests := []struct {
106 file string
107 len int
108 vals []float16.Num
109 }{
110 {"float16_nonzeros_and_nans", 8,
111 []float16.Num{
112 float16.New(1.0),
113 float16.New(-2.0),
114 float16.NaN(),
115 float16.New(0.0),
116 float16.New(-1.0),
117 float16.New(0.0).Negate(),
118 float16.New(2.0),
119 }},
120 {"float16_zeros_and_nans", 3,
121 []float16.Num{
122 float16.New(0.0),
123 float16.NaN(),
124 }},
125 }
126
127 dataDir := getDataDir()
128 for _, tt := range tests {
129 t.Run(tt.file, func(t *testing.T) {
130 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
131 defer mem.AssertSize(t, 0)
132
133 filename := filepath.Join(dataDir, tt.file+".parquet")
134 require.FileExists(t, filename)
135
136 rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
137 require.NoError(t, err)
138 defer rdr.Close()
139
140 arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
141 require.NoError(t, err)
142
143 tbl, err := arrowRdr.ReadTable(context.Background())
144 require.NoError(t, err)
145 defer tbl.Release()
146
147 assert.EqualValues(t, 1, tbl.NumCols())
148 assert.Truef(t, arrow.TypeEqual(tbl.Schema().Field(0).Type, &arrow.Float16Type{}), "expected: %s\ngot: %s", tbl.Schema().Field(0).Type, arrow.Float16Type{})
149
150 valCol := tbl.Column(0)
151 assert.EqualValues(t, tt.len, valCol.Len())
152 assert.Len(t, valCol.Data().Chunks(), 1)
153
154 chunk := valCol.Data().Chunk(0).(*array.Float16)
155 assert.True(t, chunk.IsNull(0))
156 for i := 0; i < tt.len-1; i++ {
157 expected := tt.vals[i]
158 actual := chunk.Value(i + 1)
159 if expected.IsNaN() {
160
161 assert.True(t, actual.IsNaN())
162 } else {
163 assert.Equal(t, expected.Uint16(), actual.Uint16())
164 }
165 }
166 })
167 }
168 }
169
170 func TestRecordReaderParallel(t *testing.T) {
171 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
172 defer mem.AssertSize(t, 0)
173
174 tbl := makeDateTimeTypesTable(mem, true, true)
175 defer tbl.Release()
176
177 var buf bytes.Buffer
178 require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
179
180 pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
181 require.NoError(t, err)
182
183 reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 3, Parallel: true}, mem)
184 require.NoError(t, err)
185
186 sc, err := reader.Schema()
187 assert.NoError(t, err)
188 assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s", tbl.Schema(), sc)
189
190 rr, err := reader.GetRecordReader(context.Background(), nil, nil)
191 assert.NoError(t, err)
192 assert.NotNil(t, rr)
193 defer rr.Release()
194
195 records := make([]arrow.Record, 0)
196 for rr.Next() {
197 rec := rr.Record()
198 defer rec.Release()
199
200 assert.Truef(t, sc.Equal(rec.Schema()), "expected: %s\ngot: %s", sc, rec.Schema())
201 rec.Retain()
202 records = append(records, rec)
203 }
204
205 assert.False(t, rr.Next())
206
207 tr := array.NewTableReader(tbl, 3)
208 defer tr.Release()
209
210 assert.True(t, tr.Next())
211 assert.Truef(t, array.RecordEqual(tr.Record(), records[0]), "expected: %s\ngot: %s", tr.Record(), records[0])
212 assert.True(t, tr.Next())
213 assert.Truef(t, array.RecordEqual(tr.Record(), records[1]), "expected: %s\ngot: %s", tr.Record(), records[1])
214 }
215
216 func TestRecordReaderSerial(t *testing.T) {
217 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
218 defer mem.AssertSize(t, 0)
219
220 tbl := makeDateTimeTypesTable(mem, true, true)
221 defer tbl.Release()
222
223 var buf bytes.Buffer
224 require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
225
226 pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
227 require.NoError(t, err)
228
229 reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 2}, mem)
230 require.NoError(t, err)
231
232 sc, err := reader.Schema()
233 assert.NoError(t, err)
234 assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s", tbl.Schema(), sc)
235
236 rr, err := reader.GetRecordReader(context.Background(), nil, nil)
237 assert.NoError(t, err)
238 assert.NotNil(t, rr)
239 defer rr.Release()
240
241 tr := array.NewTableReader(tbl, 2)
242 defer tr.Release()
243
244 rec, err := rr.Read()
245 assert.NoError(t, err)
246 tr.Next()
247 assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
248
249 rec, err = rr.Read()
250 assert.NoError(t, err)
251 tr.Next()
252 assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
253
254 rec, err = rr.Read()
255 assert.NoError(t, err)
256 tr.Next()
257 assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
258
259 rec, err = rr.Read()
260 assert.Same(t, io.EOF, err)
261 assert.Nil(t, rec)
262 }
263
264 func TestFileReaderWriterMetadata(t *testing.T) {
265 mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
266 defer mem.AssertSize(t, 0)
267
268 tbl := makeDateTimeTypesTable(mem, true, true)
269 defer tbl.Release()
270
271 meta := arrow.NewMetadata([]string{"foo", "bar"}, []string{"bar", "baz"})
272 sc := arrow.NewSchema(tbl.Schema().Fields(), &meta)
273
274 var buf bytes.Buffer
275 writer, err := pqarrow.NewFileWriter(sc, &buf, nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
276 require.NoError(t, err)
277 require.NoError(t, writer.WriteTable(tbl, tbl.NumRows()))
278 require.NoError(t, writer.Close())
279
280 pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
281 require.NoError(t, err)
282 defer pf.Close()
283
284 kvMeta := pf.MetaData().KeyValueMetadata()
285 assert.Equal(t, []string{"foo", "bar"}, kvMeta.Keys())
286 assert.Equal(t, []string{"bar", "baz"}, kvMeta.Values())
287 }
288
289 func TestFileReaderColumnChunkBoundsErrors(t *testing.T) {
290 schema := arrow.NewSchema([]arrow.Field{
291 {Name: "zero", Type: arrow.PrimitiveTypes.Float64},
292 {Name: "g", Type: arrow.StructOf(
293 arrow.Field{Name: "one", Type: arrow.PrimitiveTypes.Float64},
294 arrow.Field{Name: "two", Type: arrow.PrimitiveTypes.Float64},
295 arrow.Field{Name: "three", Type: arrow.PrimitiveTypes.Float64},
296 )},
297 }, nil)
298
299
300
301 data := `[
302 {
303 "zero": 1,
304 "g": {
305 "one": 1,
306 "two": 1,
307 "three": 1
308 }
309 },
310 {
311 "zero": 2,
312 "g": {
313 "one": 2,
314 "two": 2,
315 "three": 2
316 }
317 }
318 ]`
319
320 record, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, strings.NewReader(data))
321 require.NoError(t, err)
322
323 output := &bytes.Buffer{}
324 writer, err := pqarrow.NewFileWriter(schema, output, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
325 require.NoError(t, err)
326
327 require.NoError(t, writer.Write(record))
328 require.NoError(t, writer.Close())
329
330 fileReader, err := file.NewParquetReader(bytes.NewReader(output.Bytes()))
331 require.NoError(t, err)
332
333 arrowReader, err := pqarrow.NewFileReader(fileReader, pqarrow.ArrowReadProperties{BatchSize: 1024}, memory.DefaultAllocator)
334 require.NoError(t, err)
335
336
337 ctx := pqarrow.NewArrowWriteContext(context.Background(), nil)
338 assert.Greater(t, fileReader.NumRowGroups(), 0)
339 for rowGroupIndex := 0; rowGroupIndex < fileReader.NumRowGroups(); rowGroupIndex += 1 {
340 rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
341 for fieldNum := 0; fieldNum < schema.NumFields(); fieldNum += 1 {
342 _, err := rowGroupReader.Column(fieldNum).Read(ctx)
343 assert.NoError(t, err, "reading field num: %d", fieldNum)
344 }
345
346 _, subZeroErr := rowGroupReader.Column(-1).Read(ctx)
347 assert.Error(t, subZeroErr)
348
349 _, tooHighErr := rowGroupReader.Column(schema.NumFields()).Read(ctx)
350 assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields()))
351 }
352 }
353
View as plain text