...

Source file src/github.com/apache/arrow/go/v15/parquet/internal/encoding/encoding_test.go

Documentation: github.com/apache/arrow/go/v15/parquet/internal/encoding

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package encoding_test
    18  
    19  import (
    20  	"bufio"
    21  	"fmt"
    22  	"os"
    23  	"path"
    24  	"reflect"
    25  	"strconv"
    26  	"testing"
    27  	"unsafe"
    28  
    29  	"github.com/apache/arrow/go/v15/arrow"
    30  	"github.com/apache/arrow/go/v15/arrow/bitutil"
    31  	"github.com/apache/arrow/go/v15/arrow/memory"
    32  	"github.com/apache/arrow/go/v15/parquet"
    33  	"github.com/apache/arrow/go/v15/parquet/internal/encoding"
    34  	"github.com/apache/arrow/go/v15/parquet/internal/testutils"
    35  	"github.com/apache/arrow/go/v15/parquet/schema"
    36  	"github.com/stretchr/testify/assert"
    37  	"github.com/stretchr/testify/require"
    38  	"github.com/stretchr/testify/suite"
    39  )
    40  
    41  type nodeFactory func(string, parquet.Repetition, int32) *schema.PrimitiveNode
    42  
    43  func createNodeFactory(t reflect.Type) nodeFactory {
    44  	switch t {
    45  	case reflect.TypeOf(true):
    46  		return schema.NewBooleanNode
    47  	case reflect.TypeOf(int32(0)):
    48  		return schema.NewInt32Node
    49  	case reflect.TypeOf(int64(0)):
    50  		return schema.NewInt64Node
    51  	case reflect.TypeOf(parquet.Int96{}):
    52  		return schema.NewInt96Node
    53  	case reflect.TypeOf(float32(0)):
    54  		return schema.NewFloat32Node
    55  	case reflect.TypeOf(float64(0)):
    56  		return schema.NewFloat64Node
    57  	case reflect.TypeOf(parquet.ByteArray{}):
    58  		return schema.NewByteArrayNode
    59  	case reflect.TypeOf(parquet.FixedLenByteArray{}):
    60  		return func(name string, rep parquet.Repetition, field int32) *schema.PrimitiveNode {
    61  			return schema.NewFixedLenByteArrayNode(name, rep, 12, field)
    62  		}
    63  	}
    64  	return nil
    65  }
    66  
    67  func initdata(t reflect.Type, drawbuf, decodebuf []byte, nvals, repeats int, heap *memory.Buffer) (interface{}, interface{}) {
    68  	switch t {
    69  	case reflect.TypeOf(true):
    70  		draws := *(*[]bool)(unsafe.Pointer(&drawbuf))
    71  		decode := *(*[]bool)(unsafe.Pointer(&decodebuf))
    72  		testutils.InitValues(draws[:nvals], heap)
    73  
    74  		for j := 1; j < repeats; j++ {
    75  			for k := 0; k < nvals; k++ {
    76  				draws[nvals*j+k] = draws[k]
    77  			}
    78  		}
    79  
    80  		return draws[:nvals*repeats], decode[:nvals*repeats]
    81  	case reflect.TypeOf(int32(0)):
    82  		draws := arrow.Int32Traits.CastFromBytes(drawbuf)
    83  		decode := arrow.Int32Traits.CastFromBytes(decodebuf)
    84  		testutils.InitValues(draws[:nvals], heap)
    85  
    86  		for j := 1; j < repeats; j++ {
    87  			for k := 0; k < nvals; k++ {
    88  				draws[nvals*j+k] = draws[k]
    89  			}
    90  		}
    91  
    92  		return draws[:nvals*repeats], decode[:nvals*repeats]
    93  	case reflect.TypeOf(int64(0)):
    94  		draws := arrow.Int64Traits.CastFromBytes(drawbuf)
    95  		decode := arrow.Int64Traits.CastFromBytes(decodebuf)
    96  		testutils.InitValues(draws[:nvals], heap)
    97  
    98  		for j := 1; j < repeats; j++ {
    99  			for k := 0; k < nvals; k++ {
   100  				draws[nvals*j+k] = draws[k]
   101  			}
   102  		}
   103  
   104  		return draws[:nvals*repeats], decode[:nvals*repeats]
   105  	case reflect.TypeOf(parquet.Int96{}):
   106  		draws := parquet.Int96Traits.CastFromBytes(drawbuf)
   107  		decode := parquet.Int96Traits.CastFromBytes(decodebuf)
   108  		testutils.InitValues(draws[:nvals], heap)
   109  
   110  		for j := 1; j < repeats; j++ {
   111  			for k := 0; k < nvals; k++ {
   112  				draws[nvals*j+k] = draws[k]
   113  			}
   114  		}
   115  
   116  		return draws[:nvals*repeats], decode[:nvals*repeats]
   117  	case reflect.TypeOf(float32(0)):
   118  		draws := arrow.Float32Traits.CastFromBytes(drawbuf)
   119  		decode := arrow.Float32Traits.CastFromBytes(decodebuf)
   120  		testutils.InitValues(draws[:nvals], heap)
   121  
   122  		for j := 1; j < repeats; j++ {
   123  			for k := 0; k < nvals; k++ {
   124  				draws[nvals*j+k] = draws[k]
   125  			}
   126  		}
   127  
   128  		return draws[:nvals*repeats], decode[:nvals*repeats]
   129  	case reflect.TypeOf(float64(0)):
   130  		draws := arrow.Float64Traits.CastFromBytes(drawbuf)
   131  		decode := arrow.Float64Traits.CastFromBytes(decodebuf)
   132  		testutils.InitValues(draws[:nvals], heap)
   133  
   134  		for j := 1; j < repeats; j++ {
   135  			for k := 0; k < nvals; k++ {
   136  				draws[nvals*j+k] = draws[k]
   137  			}
   138  		}
   139  
   140  		return draws[:nvals*repeats], decode[:nvals*repeats]
   141  	case reflect.TypeOf(parquet.ByteArray{}):
   142  		draws := make([]parquet.ByteArray, nvals*repeats)
   143  		decode := make([]parquet.ByteArray, nvals*repeats)
   144  		testutils.InitValues(draws[:nvals], heap)
   145  
   146  		for j := 1; j < repeats; j++ {
   147  			for k := 0; k < nvals; k++ {
   148  				draws[nvals*j+k] = draws[k]
   149  			}
   150  		}
   151  
   152  		return draws[:nvals*repeats], decode[:nvals*repeats]
   153  	case reflect.TypeOf(parquet.FixedLenByteArray{}):
   154  		draws := make([]parquet.FixedLenByteArray, nvals*repeats)
   155  		decode := make([]parquet.FixedLenByteArray, nvals*repeats)
   156  		testutils.InitValues(draws[:nvals], heap)
   157  
   158  		for j := 1; j < repeats; j++ {
   159  			for k := 0; k < nvals; k++ {
   160  				draws[nvals*j+k] = draws[k]
   161  			}
   162  		}
   163  
   164  		return draws[:nvals*repeats], decode[:nvals*repeats]
   165  	}
   166  	return nil, nil
   167  }
   168  
   169  func encode(enc encoding.TypedEncoder, vals interface{}) {
   170  	switch v := vals.(type) {
   171  	case []bool:
   172  		enc.(encoding.BooleanEncoder).Put(v)
   173  	case []int32:
   174  		enc.(encoding.Int32Encoder).Put(v)
   175  	case []int64:
   176  		enc.(encoding.Int64Encoder).Put(v)
   177  	case []parquet.Int96:
   178  		enc.(encoding.Int96Encoder).Put(v)
   179  	case []float32:
   180  		enc.(encoding.Float32Encoder).Put(v)
   181  	case []float64:
   182  		enc.(encoding.Float64Encoder).Put(v)
   183  	case []parquet.ByteArray:
   184  		enc.(encoding.ByteArrayEncoder).Put(v)
   185  	case []parquet.FixedLenByteArray:
   186  		enc.(encoding.FixedLenByteArrayEncoder).Put(v)
   187  	}
   188  }
   189  
   190  func encodeSpaced(enc encoding.TypedEncoder, vals interface{}, validBits []byte, validBitsOffset int64) {
   191  	switch v := vals.(type) {
   192  	case []bool:
   193  		enc.(encoding.BooleanEncoder).PutSpaced(v, validBits, validBitsOffset)
   194  	case []int32:
   195  		enc.(encoding.Int32Encoder).PutSpaced(v, validBits, validBitsOffset)
   196  	case []int64:
   197  		enc.(encoding.Int64Encoder).PutSpaced(v, validBits, validBitsOffset)
   198  	case []parquet.Int96:
   199  		enc.(encoding.Int96Encoder).PutSpaced(v, validBits, validBitsOffset)
   200  	case []float32:
   201  		enc.(encoding.Float32Encoder).PutSpaced(v, validBits, validBitsOffset)
   202  	case []float64:
   203  		enc.(encoding.Float64Encoder).PutSpaced(v, validBits, validBitsOffset)
   204  	case []parquet.ByteArray:
   205  		enc.(encoding.ByteArrayEncoder).PutSpaced(v, validBits, validBitsOffset)
   206  	case []parquet.FixedLenByteArray:
   207  		enc.(encoding.FixedLenByteArrayEncoder).PutSpaced(v, validBits, validBitsOffset)
   208  	}
   209  }
   210  
   211  func decode(dec encoding.TypedDecoder, out interface{}) (int, error) {
   212  	switch v := out.(type) {
   213  	case []bool:
   214  		return dec.(encoding.BooleanDecoder).Decode(v)
   215  	case []int32:
   216  		return dec.(encoding.Int32Decoder).Decode(v)
   217  	case []int64:
   218  		return dec.(encoding.Int64Decoder).Decode(v)
   219  	case []parquet.Int96:
   220  		return dec.(encoding.Int96Decoder).Decode(v)
   221  	case []float32:
   222  		return dec.(encoding.Float32Decoder).Decode(v)
   223  	case []float64:
   224  		return dec.(encoding.Float64Decoder).Decode(v)
   225  	case []parquet.ByteArray:
   226  		return dec.(encoding.ByteArrayDecoder).Decode(v)
   227  	case []parquet.FixedLenByteArray:
   228  		return dec.(encoding.FixedLenByteArrayDecoder).Decode(v)
   229  	}
   230  	return 0, nil
   231  }
   232  
   233  func decodeSpaced(dec encoding.TypedDecoder, out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
   234  	switch v := out.(type) {
   235  	case []bool:
   236  		return dec.(encoding.BooleanDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   237  	case []int32:
   238  		return dec.(encoding.Int32Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   239  	case []int64:
   240  		return dec.(encoding.Int64Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   241  	case []parquet.Int96:
   242  		return dec.(encoding.Int96Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   243  	case []float32:
   244  		return dec.(encoding.Float32Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   245  	case []float64:
   246  		return dec.(encoding.Float64Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   247  	case []parquet.ByteArray:
   248  		return dec.(encoding.ByteArrayDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   249  	case []parquet.FixedLenByteArray:
   250  		return dec.(encoding.FixedLenByteArrayDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
   251  	}
   252  	return 0, nil
   253  }
   254  
   255  type BaseEncodingTestSuite struct {
   256  	suite.Suite
   257  
   258  	descr   *schema.Column
   259  	typeLen int
   260  	mem     memory.Allocator
   261  	typ     reflect.Type
   262  
   263  	nvalues     int
   264  	heap        *memory.Buffer
   265  	inputBytes  *memory.Buffer
   266  	outputBytes *memory.Buffer
   267  	nodeFactory nodeFactory
   268  
   269  	draws     interface{}
   270  	decodeBuf interface{}
   271  }
   272  
   273  func (b *BaseEncodingTestSuite) SetupSuite() {
   274  	b.mem = memory.DefaultAllocator
   275  	b.inputBytes = memory.NewResizableBuffer(b.mem)
   276  	b.outputBytes = memory.NewResizableBuffer(b.mem)
   277  	b.heap = memory.NewResizableBuffer(b.mem)
   278  	b.nodeFactory = createNodeFactory(b.typ)
   279  }
   280  
   281  func (b *BaseEncodingTestSuite) TearDownSuite() {
   282  	b.inputBytes.Release()
   283  	b.outputBytes.Release()
   284  	b.heap.Release()
   285  }
   286  
   287  func (b *BaseEncodingTestSuite) SetupTest() {
   288  	b.descr = schema.NewColumn(b.nodeFactory("name", parquet.Repetitions.Optional, -1), 0, 0)
   289  	b.typeLen = int(b.descr.TypeLength())
   290  }
   291  
   292  func (b *BaseEncodingTestSuite) initData(nvalues, repeats int) {
   293  	b.nvalues = nvalues * repeats
   294  	b.inputBytes.ResizeNoShrink(b.nvalues * int(b.typ.Size()))
   295  	b.outputBytes.ResizeNoShrink(b.nvalues * int(b.typ.Size()))
   296  	memory.Set(b.inputBytes.Buf(), 0)
   297  	memory.Set(b.outputBytes.Buf(), 0)
   298  
   299  	b.draws, b.decodeBuf = initdata(b.typ, b.inputBytes.Buf(), b.outputBytes.Buf(), nvalues, repeats, b.heap)
   300  }
   301  
   302  func (b *BaseEncodingTestSuite) encodeTestData(e parquet.Encoding) (encoding.Buffer, error) {
   303  	enc := encoding.NewEncoder(testutils.TypeToParquetType(b.typ), e, false, b.descr, memory.DefaultAllocator)
   304  	b.Equal(e, enc.Encoding())
   305  	b.Equal(b.descr.PhysicalType(), enc.Type())
   306  	encode(enc, reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface())
   307  	return enc.FlushValues()
   308  }
   309  
   310  func (b *BaseEncodingTestSuite) decodeTestData(e parquet.Encoding, buf []byte) {
   311  	dec := encoding.NewDecoder(testutils.TypeToParquetType(b.typ), e, b.descr, b.mem)
   312  	b.Equal(e, dec.Encoding())
   313  	b.Equal(b.descr.PhysicalType(), dec.Type())
   314  
   315  	dec.SetData(b.nvalues, buf)
   316  	decoded, _ := decode(dec, b.decodeBuf)
   317  	b.Equal(b.nvalues, decoded)
   318  	b.Equal(reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface(), reflect.ValueOf(b.decodeBuf).Slice(0, b.nvalues).Interface())
   319  }
   320  
   321  func (b *BaseEncodingTestSuite) encodeTestDataSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) (encoding.Buffer, error) {
   322  	enc := encoding.NewEncoder(testutils.TypeToParquetType(b.typ), e, false, b.descr, memory.DefaultAllocator)
   323  	encodeSpaced(enc, reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface(), validBits, validBitsOffset)
   324  	return enc.FlushValues()
   325  }
   326  
   327  func (b *BaseEncodingTestSuite) decodeTestDataSpaced(e parquet.Encoding, nullCount int, buf []byte, validBits []byte, validBitsOffset int64) {
   328  	dec := encoding.NewDecoder(testutils.TypeToParquetType(b.typ), e, b.descr, b.mem)
   329  	dec.SetData(b.nvalues-nullCount, buf)
   330  	decoded, _ := decodeSpaced(dec, b.decodeBuf, nullCount, validBits, validBitsOffset)
   331  	b.Equal(b.nvalues, decoded)
   332  
   333  	drawval := reflect.ValueOf(b.draws)
   334  	decodeval := reflect.ValueOf(b.decodeBuf)
   335  	for j := 0; j < b.nvalues; j++ {
   336  		if bitutil.BitIsSet(validBits, int(validBitsOffset)+j) {
   337  			b.Equal(drawval.Index(j).Interface(), decodeval.Index(j).Interface())
   338  		}
   339  	}
   340  }
   341  
   342  func (b *BaseEncodingTestSuite) checkRoundTrip(e parquet.Encoding) {
   343  	buf, _ := b.encodeTestData(e)
   344  	defer buf.Release()
   345  	b.decodeTestData(e, buf.Bytes())
   346  }
   347  
   348  func (b *BaseEncodingTestSuite) checkRoundTripSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) {
   349  	buf, _ := b.encodeTestDataSpaced(e, validBits, validBitsOffset)
   350  	defer buf.Release()
   351  
   352  	nullCount := 0
   353  	for i := 0; i < b.nvalues; i++ {
   354  		if bitutil.BitIsNotSet(validBits, int(validBitsOffset)+i) {
   355  			nullCount++
   356  		}
   357  	}
   358  	b.decodeTestDataSpaced(e, nullCount, buf.Bytes(), validBits, validBitsOffset)
   359  }
   360  
   361  func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
   362  	b.initData(10000, 1)
   363  	b.checkRoundTrip(parquet.Encodings.Plain)
   364  }
   365  
   366  func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
   367  	switch b.typ {
   368  	case reflect.TypeOf(true):
   369  		b.initData(2000, 200)
   370  		b.checkRoundTrip(parquet.Encodings.RLE)
   371  	default:
   372  		b.T().SkipNow()
   373  	}
   374  }
   375  
   376  func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
   377  	b.initData(10000, 1)
   378  
   379  	switch b.typ {
   380  	case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
   381  		b.checkRoundTrip(parquet.Encodings.DeltaBinaryPacked)
   382  	default:
   383  		b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaBinaryPacked) })
   384  	}
   385  }
   386  
   387  func (b *BaseEncodingTestSuite) TestDeltaLengthByteArrayRoundTrip() {
   388  	b.initData(10000, 1)
   389  
   390  	switch b.typ {
   391  	case reflect.TypeOf(parquet.ByteArray{}):
   392  		b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray)
   393  	default:
   394  		b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray) })
   395  	}
   396  }
   397  
   398  func (b *BaseEncodingTestSuite) TestDeltaByteArrayRoundTrip() {
   399  	b.initData(10000, 1)
   400  
   401  	switch b.typ {
   402  	case reflect.TypeOf(parquet.ByteArray{}):
   403  		b.checkRoundTrip(parquet.Encodings.DeltaByteArray)
   404  	default:
   405  		b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray) })
   406  	}
   407  }
   408  
   409  func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() {
   410  	exec := func(vals, repeats int, validBitsOffset int64, nullProb float64) {
   411  		b.Run(fmt.Sprintf("%d vals %d repeats %d offset %0.3f null", vals, repeats, validBitsOffset, 1-nullProb), func() {
   412  			b.initData(vals, repeats)
   413  
   414  			size := int64(b.nvalues) + validBitsOffset
   415  			r := testutils.NewRandomArrayGenerator(1923)
   416  			arr := r.Uint8(size, 0, 100, 1-nullProb)
   417  			validBits := arr.NullBitmapBytes()
   418  			if validBits != nil {
   419  				b.checkRoundTripSpaced(parquet.Encodings.Plain, validBits, validBitsOffset)
   420  				switch b.typ {
   421  				case reflect.TypeOf(false):
   422  					b.checkRoundTripSpaced(parquet.Encodings.RLE, validBits, validBitsOffset)
   423  				case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
   424  					b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits, validBitsOffset)
   425  				case reflect.TypeOf(parquet.ByteArray{}):
   426  					b.checkRoundTripSpaced(parquet.Encodings.DeltaLengthByteArray, validBits, validBitsOffset)
   427  					b.checkRoundTripSpaced(parquet.Encodings.DeltaByteArray, validBits, validBitsOffset)
   428  				}
   429  			}
   430  		})
   431  	}
   432  
   433  	const (
   434  		avx512Size    = 64
   435  		simdSize      = avx512Size
   436  		multiSimdSize = simdSize * 33
   437  	)
   438  
   439  	for _, nullProb := range []float64{0.001, 0.1, 0.5, 0.9, 0.999} {
   440  		// Test with both size and offset up to 3 simd block
   441  		for i := 1; i < simdSize*3; i++ {
   442  			exec(i, 1, 0, nullProb)
   443  			exec(i, 1, int64(i+1), nullProb)
   444  		}
   445  		// large block and offset
   446  		exec(multiSimdSize, 1, 0, nullProb)
   447  		exec(multiSimdSize+33, 1, 0, nullProb)
   448  		exec(multiSimdSize, 1, 33, nullProb)
   449  		exec(multiSimdSize+33, 1, 33, nullProb)
   450  	}
   451  }
   452  
   453  func TestEncoding(t *testing.T) {
   454  	tests := []struct {
   455  		name string
   456  		typ  reflect.Type
   457  	}{
   458  		{"Bool", reflect.TypeOf(true)},
   459  		{"Int32", reflect.TypeOf(int32(0))},
   460  		{"Int64", reflect.TypeOf(int64(0))},
   461  		{"Float32", reflect.TypeOf(float32(0))},
   462  		{"Float64", reflect.TypeOf(float64(0))},
   463  		{"Int96", reflect.TypeOf(parquet.Int96{})},
   464  		{"ByteArray", reflect.TypeOf(parquet.ByteArray{})},
   465  		{"FixedLenByteArray", reflect.TypeOf(parquet.FixedLenByteArray{})},
   466  	}
   467  
   468  	for _, tt := range tests {
   469  		t.Run(tt.name, func(t *testing.T) {
   470  			suite.Run(t, &BaseEncodingTestSuite{typ: tt.typ})
   471  		})
   472  	}
   473  }
   474  
   475  type DictionaryEncodingTestSuite struct {
   476  	BaseEncodingTestSuite
   477  }
   478  
   479  func (d *DictionaryEncodingTestSuite) encodeTestDataDict(e parquet.Encoding) (dictBuffer, indices encoding.Buffer, numEntries int) {
   480  	enc := encoding.NewEncoder(testutils.TypeToParquetType(d.typ), e, true, d.descr, memory.DefaultAllocator).(encoding.DictEncoder)
   481  
   482  	d.Equal(parquet.Encodings.PlainDict, enc.Encoding())
   483  	d.Equal(d.descr.PhysicalType(), enc.Type())
   484  	encode(enc, reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface())
   485  	dictBuffer = memory.NewResizableBuffer(d.mem)
   486  	dictBuffer.Resize(enc.DictEncodedSize())
   487  	enc.WriteDict(dictBuffer.Bytes())
   488  	indices, _ = enc.FlushValues()
   489  	numEntries = enc.NumEntries()
   490  	return
   491  }
   492  
   493  func (d *DictionaryEncodingTestSuite) encodeTestDataDictSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) (dictBuffer, indices encoding.Buffer, numEntries int) {
   494  	enc := encoding.NewEncoder(testutils.TypeToParquetType(d.typ), e, true, d.descr, memory.DefaultAllocator).(encoding.DictEncoder)
   495  	d.Equal(d.descr.PhysicalType(), enc.Type())
   496  
   497  	encodeSpaced(enc, reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), validBits, validBitsOffset)
   498  	dictBuffer = memory.NewResizableBuffer(d.mem)
   499  	dictBuffer.Resize(enc.DictEncodedSize())
   500  	enc.WriteDict(dictBuffer.Bytes())
   501  	indices, _ = enc.FlushValues()
   502  	numEntries = enc.NumEntries()
   503  	return
   504  }
   505  
   506  func (d *DictionaryEncodingTestSuite) checkRoundTrip() {
   507  	dictBuffer, indices, numEntries := d.encodeTestDataDict(parquet.Encodings.Plain)
   508  	defer dictBuffer.Release()
   509  	defer indices.Release()
   510  	validBits := make([]byte, int(bitutil.BytesForBits(int64(d.nvalues)))+1)
   511  	memory.Set(validBits, 255)
   512  
   513  	spacedBuffer, indicesSpaced, _ := d.encodeTestDataDictSpaced(parquet.Encodings.Plain, validBits, 0)
   514  	defer spacedBuffer.Release()
   515  	defer indicesSpaced.Release()
   516  	d.Equal(indices.Bytes(), indicesSpaced.Bytes())
   517  
   518  	dictDecoder := encoding.NewDecoder(testutils.TypeToParquetType(d.typ), parquet.Encodings.Plain, d.descr, d.mem)
   519  	d.Equal(d.descr.PhysicalType(), dictDecoder.Type())
   520  	dictDecoder.SetData(numEntries, dictBuffer.Bytes())
   521  	decoder := encoding.NewDictDecoder(testutils.TypeToParquetType(d.typ), d.descr, d.mem)
   522  	decoder.SetDict(dictDecoder)
   523  	decoder.SetData(d.nvalues, indices.Bytes())
   524  
   525  	decoded, _ := decode(decoder, d.decodeBuf)
   526  	d.Equal(d.nvalues, decoded)
   527  	d.Equal(reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), reflect.ValueOf(d.decodeBuf).Slice(0, d.nvalues).Interface())
   528  
   529  	decoder.SetData(d.nvalues, indices.Bytes())
   530  	decoded, _ = decodeSpaced(decoder, d.decodeBuf, 0, validBits, 0)
   531  	d.Equal(d.nvalues, decoded)
   532  	d.Equal(reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), reflect.ValueOf(d.decodeBuf).Slice(0, d.nvalues).Interface())
   533  }
   534  
   535  func (d *DictionaryEncodingTestSuite) TestBasicRoundTrip() {
   536  	d.initData(2500, 2)
   537  	d.checkRoundTrip()
   538  }
   539  
   540  func TestDictEncoding(t *testing.T) {
   541  	tests := []struct {
   542  		name string
   543  		typ  reflect.Type
   544  	}{
   545  		{"Int32", reflect.TypeOf(int32(0))},
   546  		{"Int64", reflect.TypeOf(int64(0))},
   547  		{"Float32", reflect.TypeOf(float32(0))},
   548  		{"Float64", reflect.TypeOf(float64(0))},
   549  		{"ByteArray", reflect.TypeOf(parquet.ByteArray{})},
   550  		{"FixedLenByteArray", reflect.TypeOf(parquet.FixedLenByteArray{})},
   551  	}
   552  
   553  	for _, tt := range tests {
   554  		t.Run(tt.name, func(t *testing.T) {
   555  			suite.Run(t, &DictionaryEncodingTestSuite{BaseEncodingTestSuite{typ: tt.typ}})
   556  		})
   557  	}
   558  }
   559  
   560  func TestWriteDeltaBitPackedInt32(t *testing.T) {
   561  	column := schema.NewColumn(schema.NewInt32Node("int32", parquet.Repetitions.Required, -1), 0, 0)
   562  
   563  	tests := []struct {
   564  		name     string
   565  		toencode []int32
   566  		expected []byte
   567  	}{
   568  		{"simple 12345", []int32{1, 2, 3, 4, 5}, []byte{128, 1, 4, 5, 2, 2, 0, 0, 0, 0}},
   569  		{"odd vals", []int32{7, 5, 3, 1, 2, 3, 4, 5}, []byte{128, 1, 4, 8, 14, 3, 2, 0, 0, 0, 192, 63, 0, 0, 0, 0, 0, 0}},
   570  	}
   571  
   572  	for _, tt := range tests {
   573  		t.Run(tt.name, func(t *testing.T) {
   574  			enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
   575  
   576  			enc.(encoding.Int32Encoder).Put(tt.toencode)
   577  			buf, _ := enc.FlushValues()
   578  			defer buf.Release()
   579  
   580  			assert.Equal(t, tt.expected, buf.Bytes())
   581  
   582  			dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
   583  
   584  			dec.(encoding.Int32Decoder).SetData(len(tt.toencode), tt.expected)
   585  			out := make([]int32, len(tt.toencode))
   586  			dec.(encoding.Int32Decoder).Decode(out)
   587  			assert.Equal(t, tt.toencode, out)
   588  		})
   589  	}
   590  
   591  	t.Run("test progressive decoding", func(t *testing.T) {
   592  		values := make([]int32, 1000)
   593  		testutils.FillRandomInt32(0, values)
   594  
   595  		enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
   596  		enc.(encoding.Int32Encoder).Put(values)
   597  		buf, _ := enc.FlushValues()
   598  		defer buf.Release()
   599  
   600  		dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
   601  		dec.(encoding.Int32Decoder).SetData(len(values), buf.Bytes())
   602  
   603  		valueBuf := make([]int32, 100)
   604  		for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
   605  			dec.(encoding.Int32Decoder).Decode(valueBuf)
   606  			assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
   607  		}
   608  	})
   609  
   610  	t.Run("test decoding multiple pages", func(t *testing.T) {
   611  		values := make([]int32, 1000)
   612  		testutils.FillRandomInt32(0, values)
   613  
   614  		enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
   615  		enc.(encoding.Int32Encoder).Put(values)
   616  		buf, _ := enc.FlushValues()
   617  		defer buf.Release()
   618  
   619  		// Using same Decoder to decode the data.
   620  		dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
   621  		for i := 0; i < 5; i += 1 {
   622  			dec.(encoding.Int32Decoder).SetData(len(values), buf.Bytes())
   623  
   624  			valueBuf := make([]int32, 100)
   625  			for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
   626  				dec.(encoding.Int32Decoder).Decode(valueBuf)
   627  				assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
   628  			}
   629  		}
   630  	})
   631  }
   632  
   633  func TestWriteDeltaBitPackedInt64(t *testing.T) {
   634  	column := schema.NewColumn(schema.NewInt64Node("int64", parquet.Repetitions.Required, -1), 0, 0)
   635  
   636  	tests := []struct {
   637  		name     string
   638  		toencode []int64
   639  		expected []byte
   640  	}{
   641  		{"simple 12345", []int64{1, 2, 3, 4, 5}, []byte{128, 1, 4, 5, 2, 2, 0, 0, 0, 0}},
   642  		{"odd vals", []int64{7, 5, 3, 1, 2, 3, 4, 5}, []byte{128, 1, 4, 8, 14, 3, 2, 0, 0, 0, 192, 63, 0, 0, 0, 0, 0, 0}},
   643  	}
   644  
   645  	for _, tt := range tests {
   646  		t.Run(tt.name, func(t *testing.T) {
   647  			enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
   648  
   649  			enc.(encoding.Int64Encoder).Put(tt.toencode)
   650  			buf, _ := enc.FlushValues()
   651  			defer buf.Release()
   652  
   653  			assert.Equal(t, tt.expected, buf.Bytes())
   654  
   655  			dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
   656  
   657  			dec.(encoding.Int64Decoder).SetData(len(tt.toencode), tt.expected)
   658  			out := make([]int64, len(tt.toencode))
   659  			dec.(encoding.Int64Decoder).Decode(out)
   660  			assert.Equal(t, tt.toencode, out)
   661  		})
   662  	}
   663  
   664  	t.Run("test progressive decoding", func(t *testing.T) {
   665  		values := make([]int64, 1000)
   666  		testutils.FillRandomInt64(0, values)
   667  
   668  		enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
   669  		enc.(encoding.Int64Encoder).Put(values)
   670  		buf, _ := enc.FlushValues()
   671  		defer buf.Release()
   672  
   673  		dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
   674  		dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
   675  
   676  		valueBuf := make([]int64, 100)
   677  		for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
   678  			decoded, _ := dec.(encoding.Int64Decoder).Decode(valueBuf)
   679  			assert.Equal(t, len(valueBuf), decoded)
   680  			assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
   681  		}
   682  	})
   683  
   684  	t.Run("GH-37102", func(t *testing.T) {
   685  		values := []int64{
   686  			0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   687  			0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   688  			0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   689  			0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   690  			0, 0,
   691  		}
   692  
   693  		enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
   694  		enc.(encoding.Int64Encoder).Put(values)
   695  		buf, _ := enc.FlushValues()
   696  		defer buf.Release()
   697  
   698  		dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
   699  		dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
   700  
   701  		valueBuf := make([]int64, len(values))
   702  
   703  		decoded, _ := dec.(encoding.Int64Decoder).Decode(valueBuf)
   704  		assert.Equal(t, len(valueBuf), decoded)
   705  		assert.Equal(t, values, valueBuf)
   706  	})
   707  
   708  	t.Run("test decoding multiple pages", func(t *testing.T) {
   709  		values := make([]int64, 1000)
   710  		testutils.FillRandomInt64(0, values)
   711  
   712  		enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
   713  		enc.(encoding.Int64Encoder).Put(values)
   714  		buf, _ := enc.FlushValues()
   715  		defer buf.Release()
   716  
   717  		// Using same Decoder to decode the data.
   718  		dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
   719  		for i := 0; i < 5; i += 1 {
   720  			dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
   721  
   722  			valueBuf := make([]int64, 100)
   723  			for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
   724  				dec.(encoding.Int64Decoder).Decode(valueBuf)
   725  				assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
   726  			}
   727  		}
   728  	})
   729  }
   730  
   731  func TestDeltaLengthByteArrayEncoding(t *testing.T) {
   732  	column := schema.NewColumn(schema.NewByteArrayNode("bytearray", parquet.Repetitions.Required, -1), 0, 0)
   733  
   734  	test := []parquet.ByteArray{[]byte("Hello"), []byte("World"), []byte("Foobar"), []byte("ABCDEF")}
   735  	expected := []byte{128, 1, 4, 4, 10, 0, 1, 0, 0, 0, 2, 0, 0, 0, 72, 101, 108, 108, 111, 87, 111, 114, 108, 100, 70, 111, 111, 98, 97, 114, 65, 66, 67, 68, 69, 70}
   736  
   737  	enc := encoding.NewEncoder(parquet.Types.ByteArray, parquet.Encodings.DeltaLengthByteArray, false, column, memory.DefaultAllocator)
   738  	enc.(encoding.ByteArrayEncoder).Put(test)
   739  	buf, _ := enc.FlushValues()
   740  	defer buf.Release()
   741  
   742  	assert.Equal(t, expected, buf.Bytes())
   743  
   744  	dec := encoding.NewDecoder(parquet.Types.ByteArray, parquet.Encodings.DeltaLengthByteArray, column, nil)
   745  	dec.SetData(len(test), expected)
   746  	out := make([]parquet.ByteArray, len(test))
   747  	decoded, _ := dec.(encoding.ByteArrayDecoder).Decode(out)
   748  	assert.Equal(t, len(test), decoded)
   749  	assert.Equal(t, test, out)
   750  }
   751  
   752  func TestDeltaByteArrayEncoding(t *testing.T) {
   753  	test := []parquet.ByteArray{[]byte("Hello"), []byte("World"), []byte("Foobar"), []byte("ABCDEF")}
   754  	expected := []byte{128, 1, 4, 4, 0, 0, 0, 0, 0, 0, 128, 1, 4, 4, 10, 0, 1, 0, 0, 0, 2, 0, 0, 0, 72, 101, 108, 108, 111, 87, 111, 114, 108, 100, 70, 111, 111, 98, 97, 114, 65, 66, 67, 68, 69, 70}
   755  
   756  	enc := encoding.NewEncoder(parquet.Types.ByteArray, parquet.Encodings.DeltaByteArray, false, nil, nil)
   757  	enc.(encoding.ByteArrayEncoder).Put(test)
   758  	buf, _ := enc.FlushValues()
   759  	defer buf.Release()
   760  
   761  	assert.Equal(t, expected, buf.Bytes())
   762  
   763  	dec := encoding.NewDecoder(parquet.Types.ByteArray, parquet.Encodings.DeltaByteArray, nil, nil)
   764  	dec.SetData(len(test), expected)
   765  	out := make([]parquet.ByteArray, len(test))
   766  	decoded, _ := dec.(encoding.ByteArrayDecoder).Decode(out)
   767  	assert.Equal(t, len(test), decoded)
   768  	assert.Equal(t, test, out)
   769  }
   770  
   771  func TestDeltaBitPacking(t *testing.T) {
   772  	datadir := os.Getenv("ARROW_TEST_DATA")
   773  	if datadir == "" {
   774  		return
   775  	}
   776  
   777  	fname := path.Join(datadir, "parquet/timestamp.data")
   778  	require.FileExists(t, fname)
   779  	f, err := os.Open(fname)
   780  	if err != nil {
   781  		t.Fatal(err)
   782  	}
   783  	defer f.Close()
   784  
   785  	values := make([]int64, 0)
   786  
   787  	scanner := bufio.NewScanner(f)
   788  	for scanner.Scan() {
   789  		v, err := strconv.ParseInt(scanner.Text(), 10, 64)
   790  		if err != nil {
   791  			t.Fatal(err)
   792  		}
   793  		values = append(values, v)
   794  	}
   795  
   796  	if err := scanner.Err(); err != nil {
   797  		t.Fatal(err)
   798  	}
   799  
   800  	col := schema.NewColumn(schema.MustPrimitive(schema.NewPrimitiveNode("foo", parquet.Repetitions.Required,
   801  		parquet.Types.Int64, -1, -1)), 0, 0)
   802  	enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, col, memory.DefaultAllocator).(encoding.Int64Encoder)
   803  
   804  	enc.Put(values)
   805  	buf, err := enc.FlushValues()
   806  	if err != nil {
   807  		t.Fatal(err)
   808  	}
   809  	defer buf.Release()
   810  
   811  	dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, col, memory.DefaultAllocator).(encoding.Int64Decoder)
   812  	dec.SetData(len(values), buf.Bytes())
   813  
   814  	ll := len(values)
   815  	for i := 0; i < ll; i += 1024 {
   816  		out := make([]int64, 1024)
   817  		n, err := dec.Decode(out)
   818  		if err != nil {
   819  			t.Fatal(err)
   820  		}
   821  		assert.Equal(t, values[:n], out[:n])
   822  		values = values[n:]
   823  	}
   824  	assert.Equal(t, dec.ValuesLeft(), 0)
   825  }
   826  
   827  func TestBooleanPlainDecoderAfterFlushing(t *testing.T) {
   828  	descr := schema.NewColumn(schema.NewBooleanNode("bool", parquet.Repetitions.Optional, -1), 0, 0)
   829  	enc := encoding.NewEncoder(parquet.Types.Boolean, parquet.Encodings.Plain, false, descr, memory.DefaultAllocator)
   830  	benc := enc.(encoding.BooleanEncoder)
   831  
   832  	dec := encoding.NewDecoder(parquet.Types.Boolean, parquet.Encodings.Plain, descr, memory.DefaultAllocator)
   833  	decSlice := make([]bool, 1)
   834  	bdec := dec.(encoding.BooleanDecoder)
   835  
   836  	// Write and extract two different values
   837  	// This is validating that `FlushValues` wholly
   838  	// resets the encoder state.
   839  	benc.Put([]bool{true})
   840  	buf1, err := benc.FlushValues()
   841  	assert.NoError(t, err)
   842  
   843  	benc.Put([]bool{false})
   844  	buf2, err := benc.FlushValues()
   845  	assert.NoError(t, err)
   846  
   847  	// Decode buf1, expect true
   848  	err = bdec.SetData(1, buf1.Buf())
   849  	assert.NoError(t, err)
   850  	n, err := bdec.Decode(decSlice)
   851  	assert.NoError(t, err)
   852  	assert.Equal(t, n, 1)
   853  	assert.Equal(t, decSlice[0], true)
   854  
   855  	// Decode buf2, expect false
   856  	err = bdec.SetData(1, buf2.Buf())
   857  	assert.NoError(t, err)
   858  	n, err = bdec.Decode(decSlice)
   859  	assert.NoError(t, err)
   860  	assert.Equal(t, n, 1)
   861  	assert.Equal(t, decSlice[0], false)
   862  }
   863  

View as plain text