...

Source file src/github.com/apache/arrow/go/v15/parquet/pqarrow/file_reader_test.go

Documentation: github.com/apache/arrow/go/v15/parquet/pqarrow

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package pqarrow_test
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"os"
    25  	"path/filepath"
    26  	"strings"
    27  	"testing"
    28  
    29  	"github.com/apache/arrow/go/v15/arrow"
    30  	"github.com/apache/arrow/go/v15/arrow/array"
    31  	"github.com/apache/arrow/go/v15/arrow/decimal128"
    32  	"github.com/apache/arrow/go/v15/arrow/float16"
    33  	"github.com/apache/arrow/go/v15/arrow/memory"
    34  	"github.com/apache/arrow/go/v15/parquet"
    35  	"github.com/apache/arrow/go/v15/parquet/file"
    36  	"github.com/apache/arrow/go/v15/parquet/pqarrow"
    37  	"github.com/stretchr/testify/assert"
    38  	"github.com/stretchr/testify/require"
    39  )
    40  
    41  func getDataDir() string {
    42  	datadir := os.Getenv("PARQUET_TEST_DATA")
    43  	if datadir == "" {
    44  		panic("please point PARQUET_TEST_DATA env var to the test data directory")
    45  	}
    46  	return datadir
    47  }
    48  
    49  func TestArrowReaderAdHocReadDecimals(t *testing.T) {
    50  	tests := []struct {
    51  		file string
    52  		typ  *arrow.Decimal128Type
    53  	}{
    54  		{"int32_decimal", &arrow.Decimal128Type{Precision: 4, Scale: 2}},
    55  		{"int64_decimal", &arrow.Decimal128Type{Precision: 10, Scale: 2}},
    56  		{"fixed_length_decimal", &arrow.Decimal128Type{Precision: 25, Scale: 2}},
    57  		{"fixed_length_decimal_legacy", &arrow.Decimal128Type{Precision: 13, Scale: 2}},
    58  		{"byte_array_decimal", &arrow.Decimal128Type{Precision: 4, Scale: 2}},
    59  	}
    60  
    61  	dataDir := getDataDir()
    62  	for _, tt := range tests {
    63  		t.Run(tt.file, func(t *testing.T) {
    64  			mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
    65  			defer mem.AssertSize(t, 0)
    66  
    67  			filename := filepath.Join(dataDir, tt.file+".parquet")
    68  			require.FileExists(t, filename)
    69  
    70  			rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
    71  			require.NoError(t, err)
    72  			defer rdr.Close()
    73  			arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
    74  			require.NoError(t, err)
    75  
    76  			tbl, err := arrowRdr.ReadTable(context.Background())
    77  			require.NoError(t, err)
    78  			defer tbl.Release()
    79  
    80  			assert.EqualValues(t, 1, tbl.NumCols())
    81  			assert.Truef(t, arrow.TypeEqual(tbl.Schema().Field(0).Type, tt.typ), "expected: %s\ngot: %s", tbl.Schema().Field(0).Type, tt.typ)
    82  
    83  			const expectedLen = 24
    84  			valCol := tbl.Column(0)
    85  
    86  			assert.EqualValues(t, expectedLen, valCol.Len())
    87  			assert.Len(t, valCol.Data().Chunks(), 1)
    88  
    89  			chunk := valCol.Data().Chunk(0)
    90  			bldr := array.NewDecimal128Builder(mem, tt.typ)
    91  			defer bldr.Release()
    92  			for i := 0; i < expectedLen; i++ {
    93  				bldr.Append(decimal128.FromI64(int64((i + 1) * 100)))
    94  			}
    95  
    96  			expectedArr := bldr.NewDecimal128Array()
    97  			defer expectedArr.Release()
    98  
    99  			assert.Truef(t, array.Equal(expectedArr, chunk), "expected: %s\ngot: %s", expectedArr, chunk)
   100  		})
   101  	}
   102  }
   103  
   104  func TestArrowReaderAdHocReadFloat16s(t *testing.T) {
   105  	tests := []struct {
   106  		file string
   107  		len  int
   108  		vals []float16.Num
   109  	}{
   110  		{"float16_nonzeros_and_nans", 8,
   111  			[]float16.Num{
   112  				float16.New(1.0),
   113  				float16.New(-2.0),
   114  				float16.NaN(),
   115  				float16.New(0.0),
   116  				float16.New(-1.0),
   117  				float16.New(0.0).Negate(),
   118  				float16.New(2.0),
   119  			}},
   120  		{"float16_zeros_and_nans", 3,
   121  			[]float16.Num{
   122  				float16.New(0.0),
   123  				float16.NaN(),
   124  			}},
   125  	}
   126  
   127  	dataDir := getDataDir()
   128  	for _, tt := range tests {
   129  		t.Run(tt.file, func(t *testing.T) {
   130  			mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
   131  			defer mem.AssertSize(t, 0)
   132  
   133  			filename := filepath.Join(dataDir, tt.file+".parquet")
   134  			require.FileExists(t, filename)
   135  
   136  			rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
   137  			require.NoError(t, err)
   138  			defer rdr.Close()
   139  
   140  			arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
   141  			require.NoError(t, err)
   142  
   143  			tbl, err := arrowRdr.ReadTable(context.Background())
   144  			require.NoError(t, err)
   145  			defer tbl.Release()
   146  
   147  			assert.EqualValues(t, 1, tbl.NumCols())
   148  			assert.Truef(t, arrow.TypeEqual(tbl.Schema().Field(0).Type, &arrow.Float16Type{}), "expected: %s\ngot: %s", tbl.Schema().Field(0).Type, arrow.Float16Type{})
   149  
   150  			valCol := tbl.Column(0)
   151  			assert.EqualValues(t, tt.len, valCol.Len())
   152  			assert.Len(t, valCol.Data().Chunks(), 1)
   153  
   154  			chunk := valCol.Data().Chunk(0).(*array.Float16)
   155  			assert.True(t, chunk.IsNull(0))
   156  			for i := 0; i < tt.len-1; i++ {
   157  				expected := tt.vals[i]
   158  				actual := chunk.Value(i + 1)
   159  				if expected.IsNaN() {
   160  					// NaN representations aren't guaranteed to be exact on a binary level
   161  					assert.True(t, actual.IsNaN())
   162  				} else {
   163  					assert.Equal(t, expected.Uint16(), actual.Uint16())
   164  				}
   165  			}
   166  		})
   167  	}
   168  }
   169  
   170  func TestRecordReaderParallel(t *testing.T) {
   171  	mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
   172  	defer mem.AssertSize(t, 0)
   173  
   174  	tbl := makeDateTimeTypesTable(mem, true, true)
   175  	defer tbl.Release()
   176  
   177  	var buf bytes.Buffer
   178  	require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
   179  
   180  	pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
   181  	require.NoError(t, err)
   182  
   183  	reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 3, Parallel: true}, mem)
   184  	require.NoError(t, err)
   185  
   186  	sc, err := reader.Schema()
   187  	assert.NoError(t, err)
   188  	assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s", tbl.Schema(), sc)
   189  
   190  	rr, err := reader.GetRecordReader(context.Background(), nil, nil)
   191  	assert.NoError(t, err)
   192  	assert.NotNil(t, rr)
   193  	defer rr.Release()
   194  
   195  	records := make([]arrow.Record, 0)
   196  	for rr.Next() {
   197  		rec := rr.Record()
   198  		defer rec.Release()
   199  
   200  		assert.Truef(t, sc.Equal(rec.Schema()), "expected: %s\ngot: %s", sc, rec.Schema())
   201  		rec.Retain()
   202  		records = append(records, rec)
   203  	}
   204  
   205  	assert.False(t, rr.Next())
   206  
   207  	tr := array.NewTableReader(tbl, 3)
   208  	defer tr.Release()
   209  
   210  	assert.True(t, tr.Next())
   211  	assert.Truef(t, array.RecordEqual(tr.Record(), records[0]), "expected: %s\ngot: %s", tr.Record(), records[0])
   212  	assert.True(t, tr.Next())
   213  	assert.Truef(t, array.RecordEqual(tr.Record(), records[1]), "expected: %s\ngot: %s", tr.Record(), records[1])
   214  }
   215  
   216  func TestRecordReaderSerial(t *testing.T) {
   217  	mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
   218  	defer mem.AssertSize(t, 0)
   219  
   220  	tbl := makeDateTimeTypesTable(mem, true, true)
   221  	defer tbl.Release()
   222  
   223  	var buf bytes.Buffer
   224  	require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
   225  
   226  	pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
   227  	require.NoError(t, err)
   228  
   229  	reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 2}, mem)
   230  	require.NoError(t, err)
   231  
   232  	sc, err := reader.Schema()
   233  	assert.NoError(t, err)
   234  	assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s", tbl.Schema(), sc)
   235  
   236  	rr, err := reader.GetRecordReader(context.Background(), nil, nil)
   237  	assert.NoError(t, err)
   238  	assert.NotNil(t, rr)
   239  	defer rr.Release()
   240  
   241  	tr := array.NewTableReader(tbl, 2)
   242  	defer tr.Release()
   243  
   244  	rec, err := rr.Read()
   245  	assert.NoError(t, err)
   246  	tr.Next()
   247  	assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
   248  
   249  	rec, err = rr.Read()
   250  	assert.NoError(t, err)
   251  	tr.Next()
   252  	assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
   253  
   254  	rec, err = rr.Read()
   255  	assert.NoError(t, err)
   256  	tr.Next()
   257  	assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
   258  
   259  	rec, err = rr.Read()
   260  	assert.Same(t, io.EOF, err)
   261  	assert.Nil(t, rec)
   262  }
   263  
   264  func TestFileReaderWriterMetadata(t *testing.T) {
   265  	mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
   266  	defer mem.AssertSize(t, 0)
   267  
   268  	tbl := makeDateTimeTypesTable(mem, true, true)
   269  	defer tbl.Release()
   270  
   271  	meta := arrow.NewMetadata([]string{"foo", "bar"}, []string{"bar", "baz"})
   272  	sc := arrow.NewSchema(tbl.Schema().Fields(), &meta)
   273  
   274  	var buf bytes.Buffer
   275  	writer, err := pqarrow.NewFileWriter(sc, &buf, nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
   276  	require.NoError(t, err)
   277  	require.NoError(t, writer.WriteTable(tbl, tbl.NumRows()))
   278  	require.NoError(t, writer.Close())
   279  
   280  	pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
   281  	require.NoError(t, err)
   282  	defer pf.Close()
   283  
   284  	kvMeta := pf.MetaData().KeyValueMetadata()
   285  	assert.Equal(t, []string{"foo", "bar"}, kvMeta.Keys())
   286  	assert.Equal(t, []string{"bar", "baz"}, kvMeta.Values())
   287  }
   288  
   289  func TestFileReaderColumnChunkBoundsErrors(t *testing.T) {
   290  	schema := arrow.NewSchema([]arrow.Field{
   291  		{Name: "zero", Type: arrow.PrimitiveTypes.Float64},
   292  		{Name: "g", Type: arrow.StructOf(
   293  			arrow.Field{Name: "one", Type: arrow.PrimitiveTypes.Float64},
   294  			arrow.Field{Name: "two", Type: arrow.PrimitiveTypes.Float64},
   295  			arrow.Field{Name: "three", Type: arrow.PrimitiveTypes.Float64},
   296  		)},
   297  	}, nil)
   298  
   299  	// generate Parquet data with four columns
   300  	// that are represented by two logical fields
   301  	data := `[
   302  		{
   303  			"zero": 1,
   304  			"g": {
   305  				"one": 1,
   306  				"two": 1,
   307  				"three": 1
   308  			}
   309  		},
   310  		{
   311  			"zero": 2,
   312  			"g": {
   313  				"one": 2,
   314  				"two": 2,
   315  				"three": 2
   316  			}
   317  		}
   318  	]`
   319  
   320  	record, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, strings.NewReader(data))
   321  	require.NoError(t, err)
   322  
   323  	output := &bytes.Buffer{}
   324  	writer, err := pqarrow.NewFileWriter(schema, output, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
   325  	require.NoError(t, err)
   326  
   327  	require.NoError(t, writer.Write(record))
   328  	require.NoError(t, writer.Close())
   329  
   330  	fileReader, err := file.NewParquetReader(bytes.NewReader(output.Bytes()))
   331  	require.NoError(t, err)
   332  
   333  	arrowReader, err := pqarrow.NewFileReader(fileReader, pqarrow.ArrowReadProperties{BatchSize: 1024}, memory.DefaultAllocator)
   334  	require.NoError(t, err)
   335  
   336  	// assert that errors are returned for indexes outside the bounds of the logical fields (instead of the physical columns)
   337  	ctx := pqarrow.NewArrowWriteContext(context.Background(), nil)
   338  	assert.Greater(t, fileReader.NumRowGroups(), 0)
   339  	for rowGroupIndex := 0; rowGroupIndex < fileReader.NumRowGroups(); rowGroupIndex += 1 {
   340  		rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
   341  		for fieldNum := 0; fieldNum < schema.NumFields(); fieldNum += 1 {
   342  			_, err := rowGroupReader.Column(fieldNum).Read(ctx)
   343  			assert.NoError(t, err, "reading field num: %d", fieldNum)
   344  		}
   345  
   346  		_, subZeroErr := rowGroupReader.Column(-1).Read(ctx)
   347  		assert.Error(t, subZeroErr)
   348  
   349  		_, tooHighErr := rowGroupReader.Column(schema.NumFields()).Read(ctx)
   350  		assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields()))
   351  	}
   352  }
   353  

View as plain text