1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding_test
18
19 import (
20 "bufio"
21 "fmt"
22 "os"
23 "path"
24 "reflect"
25 "strconv"
26 "testing"
27 "unsafe"
28
29 "github.com/apache/arrow/go/v15/arrow"
30 "github.com/apache/arrow/go/v15/arrow/bitutil"
31 "github.com/apache/arrow/go/v15/arrow/memory"
32 "github.com/apache/arrow/go/v15/parquet"
33 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
34 "github.com/apache/arrow/go/v15/parquet/internal/testutils"
35 "github.com/apache/arrow/go/v15/parquet/schema"
36 "github.com/stretchr/testify/assert"
37 "github.com/stretchr/testify/require"
38 "github.com/stretchr/testify/suite"
39 )
40
41 type nodeFactory func(string, parquet.Repetition, int32) *schema.PrimitiveNode
42
43 func createNodeFactory(t reflect.Type) nodeFactory {
44 switch t {
45 case reflect.TypeOf(true):
46 return schema.NewBooleanNode
47 case reflect.TypeOf(int32(0)):
48 return schema.NewInt32Node
49 case reflect.TypeOf(int64(0)):
50 return schema.NewInt64Node
51 case reflect.TypeOf(parquet.Int96{}):
52 return schema.NewInt96Node
53 case reflect.TypeOf(float32(0)):
54 return schema.NewFloat32Node
55 case reflect.TypeOf(float64(0)):
56 return schema.NewFloat64Node
57 case reflect.TypeOf(parquet.ByteArray{}):
58 return schema.NewByteArrayNode
59 case reflect.TypeOf(parquet.FixedLenByteArray{}):
60 return func(name string, rep parquet.Repetition, field int32) *schema.PrimitiveNode {
61 return schema.NewFixedLenByteArrayNode(name, rep, 12, field)
62 }
63 }
64 return nil
65 }
66
67 func initdata(t reflect.Type, drawbuf, decodebuf []byte, nvals, repeats int, heap *memory.Buffer) (interface{}, interface{}) {
68 switch t {
69 case reflect.TypeOf(true):
70 draws := *(*[]bool)(unsafe.Pointer(&drawbuf))
71 decode := *(*[]bool)(unsafe.Pointer(&decodebuf))
72 testutils.InitValues(draws[:nvals], heap)
73
74 for j := 1; j < repeats; j++ {
75 for k := 0; k < nvals; k++ {
76 draws[nvals*j+k] = draws[k]
77 }
78 }
79
80 return draws[:nvals*repeats], decode[:nvals*repeats]
81 case reflect.TypeOf(int32(0)):
82 draws := arrow.Int32Traits.CastFromBytes(drawbuf)
83 decode := arrow.Int32Traits.CastFromBytes(decodebuf)
84 testutils.InitValues(draws[:nvals], heap)
85
86 for j := 1; j < repeats; j++ {
87 for k := 0; k < nvals; k++ {
88 draws[nvals*j+k] = draws[k]
89 }
90 }
91
92 return draws[:nvals*repeats], decode[:nvals*repeats]
93 case reflect.TypeOf(int64(0)):
94 draws := arrow.Int64Traits.CastFromBytes(drawbuf)
95 decode := arrow.Int64Traits.CastFromBytes(decodebuf)
96 testutils.InitValues(draws[:nvals], heap)
97
98 for j := 1; j < repeats; j++ {
99 for k := 0; k < nvals; k++ {
100 draws[nvals*j+k] = draws[k]
101 }
102 }
103
104 return draws[:nvals*repeats], decode[:nvals*repeats]
105 case reflect.TypeOf(parquet.Int96{}):
106 draws := parquet.Int96Traits.CastFromBytes(drawbuf)
107 decode := parquet.Int96Traits.CastFromBytes(decodebuf)
108 testutils.InitValues(draws[:nvals], heap)
109
110 for j := 1; j < repeats; j++ {
111 for k := 0; k < nvals; k++ {
112 draws[nvals*j+k] = draws[k]
113 }
114 }
115
116 return draws[:nvals*repeats], decode[:nvals*repeats]
117 case reflect.TypeOf(float32(0)):
118 draws := arrow.Float32Traits.CastFromBytes(drawbuf)
119 decode := arrow.Float32Traits.CastFromBytes(decodebuf)
120 testutils.InitValues(draws[:nvals], heap)
121
122 for j := 1; j < repeats; j++ {
123 for k := 0; k < nvals; k++ {
124 draws[nvals*j+k] = draws[k]
125 }
126 }
127
128 return draws[:nvals*repeats], decode[:nvals*repeats]
129 case reflect.TypeOf(float64(0)):
130 draws := arrow.Float64Traits.CastFromBytes(drawbuf)
131 decode := arrow.Float64Traits.CastFromBytes(decodebuf)
132 testutils.InitValues(draws[:nvals], heap)
133
134 for j := 1; j < repeats; j++ {
135 for k := 0; k < nvals; k++ {
136 draws[nvals*j+k] = draws[k]
137 }
138 }
139
140 return draws[:nvals*repeats], decode[:nvals*repeats]
141 case reflect.TypeOf(parquet.ByteArray{}):
142 draws := make([]parquet.ByteArray, nvals*repeats)
143 decode := make([]parquet.ByteArray, nvals*repeats)
144 testutils.InitValues(draws[:nvals], heap)
145
146 for j := 1; j < repeats; j++ {
147 for k := 0; k < nvals; k++ {
148 draws[nvals*j+k] = draws[k]
149 }
150 }
151
152 return draws[:nvals*repeats], decode[:nvals*repeats]
153 case reflect.TypeOf(parquet.FixedLenByteArray{}):
154 draws := make([]parquet.FixedLenByteArray, nvals*repeats)
155 decode := make([]parquet.FixedLenByteArray, nvals*repeats)
156 testutils.InitValues(draws[:nvals], heap)
157
158 for j := 1; j < repeats; j++ {
159 for k := 0; k < nvals; k++ {
160 draws[nvals*j+k] = draws[k]
161 }
162 }
163
164 return draws[:nvals*repeats], decode[:nvals*repeats]
165 }
166 return nil, nil
167 }
168
169 func encode(enc encoding.TypedEncoder, vals interface{}) {
170 switch v := vals.(type) {
171 case []bool:
172 enc.(encoding.BooleanEncoder).Put(v)
173 case []int32:
174 enc.(encoding.Int32Encoder).Put(v)
175 case []int64:
176 enc.(encoding.Int64Encoder).Put(v)
177 case []parquet.Int96:
178 enc.(encoding.Int96Encoder).Put(v)
179 case []float32:
180 enc.(encoding.Float32Encoder).Put(v)
181 case []float64:
182 enc.(encoding.Float64Encoder).Put(v)
183 case []parquet.ByteArray:
184 enc.(encoding.ByteArrayEncoder).Put(v)
185 case []parquet.FixedLenByteArray:
186 enc.(encoding.FixedLenByteArrayEncoder).Put(v)
187 }
188 }
189
190 func encodeSpaced(enc encoding.TypedEncoder, vals interface{}, validBits []byte, validBitsOffset int64) {
191 switch v := vals.(type) {
192 case []bool:
193 enc.(encoding.BooleanEncoder).PutSpaced(v, validBits, validBitsOffset)
194 case []int32:
195 enc.(encoding.Int32Encoder).PutSpaced(v, validBits, validBitsOffset)
196 case []int64:
197 enc.(encoding.Int64Encoder).PutSpaced(v, validBits, validBitsOffset)
198 case []parquet.Int96:
199 enc.(encoding.Int96Encoder).PutSpaced(v, validBits, validBitsOffset)
200 case []float32:
201 enc.(encoding.Float32Encoder).PutSpaced(v, validBits, validBitsOffset)
202 case []float64:
203 enc.(encoding.Float64Encoder).PutSpaced(v, validBits, validBitsOffset)
204 case []parquet.ByteArray:
205 enc.(encoding.ByteArrayEncoder).PutSpaced(v, validBits, validBitsOffset)
206 case []parquet.FixedLenByteArray:
207 enc.(encoding.FixedLenByteArrayEncoder).PutSpaced(v, validBits, validBitsOffset)
208 }
209 }
210
211 func decode(dec encoding.TypedDecoder, out interface{}) (int, error) {
212 switch v := out.(type) {
213 case []bool:
214 return dec.(encoding.BooleanDecoder).Decode(v)
215 case []int32:
216 return dec.(encoding.Int32Decoder).Decode(v)
217 case []int64:
218 return dec.(encoding.Int64Decoder).Decode(v)
219 case []parquet.Int96:
220 return dec.(encoding.Int96Decoder).Decode(v)
221 case []float32:
222 return dec.(encoding.Float32Decoder).Decode(v)
223 case []float64:
224 return dec.(encoding.Float64Decoder).Decode(v)
225 case []parquet.ByteArray:
226 return dec.(encoding.ByteArrayDecoder).Decode(v)
227 case []parquet.FixedLenByteArray:
228 return dec.(encoding.FixedLenByteArrayDecoder).Decode(v)
229 }
230 return 0, nil
231 }
232
233 func decodeSpaced(dec encoding.TypedDecoder, out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
234 switch v := out.(type) {
235 case []bool:
236 return dec.(encoding.BooleanDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
237 case []int32:
238 return dec.(encoding.Int32Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
239 case []int64:
240 return dec.(encoding.Int64Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
241 case []parquet.Int96:
242 return dec.(encoding.Int96Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
243 case []float32:
244 return dec.(encoding.Float32Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
245 case []float64:
246 return dec.(encoding.Float64Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
247 case []parquet.ByteArray:
248 return dec.(encoding.ByteArrayDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
249 case []parquet.FixedLenByteArray:
250 return dec.(encoding.FixedLenByteArrayDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
251 }
252 return 0, nil
253 }
254
255 type BaseEncodingTestSuite struct {
256 suite.Suite
257
258 descr *schema.Column
259 typeLen int
260 mem memory.Allocator
261 typ reflect.Type
262
263 nvalues int
264 heap *memory.Buffer
265 inputBytes *memory.Buffer
266 outputBytes *memory.Buffer
267 nodeFactory nodeFactory
268
269 draws interface{}
270 decodeBuf interface{}
271 }
272
273 func (b *BaseEncodingTestSuite) SetupSuite() {
274 b.mem = memory.DefaultAllocator
275 b.inputBytes = memory.NewResizableBuffer(b.mem)
276 b.outputBytes = memory.NewResizableBuffer(b.mem)
277 b.heap = memory.NewResizableBuffer(b.mem)
278 b.nodeFactory = createNodeFactory(b.typ)
279 }
280
281 func (b *BaseEncodingTestSuite) TearDownSuite() {
282 b.inputBytes.Release()
283 b.outputBytes.Release()
284 b.heap.Release()
285 }
286
287 func (b *BaseEncodingTestSuite) SetupTest() {
288 b.descr = schema.NewColumn(b.nodeFactory("name", parquet.Repetitions.Optional, -1), 0, 0)
289 b.typeLen = int(b.descr.TypeLength())
290 }
291
292 func (b *BaseEncodingTestSuite) initData(nvalues, repeats int) {
293 b.nvalues = nvalues * repeats
294 b.inputBytes.ResizeNoShrink(b.nvalues * int(b.typ.Size()))
295 b.outputBytes.ResizeNoShrink(b.nvalues * int(b.typ.Size()))
296 memory.Set(b.inputBytes.Buf(), 0)
297 memory.Set(b.outputBytes.Buf(), 0)
298
299 b.draws, b.decodeBuf = initdata(b.typ, b.inputBytes.Buf(), b.outputBytes.Buf(), nvalues, repeats, b.heap)
300 }
301
302 func (b *BaseEncodingTestSuite) encodeTestData(e parquet.Encoding) (encoding.Buffer, error) {
303 enc := encoding.NewEncoder(testutils.TypeToParquetType(b.typ), e, false, b.descr, memory.DefaultAllocator)
304 b.Equal(e, enc.Encoding())
305 b.Equal(b.descr.PhysicalType(), enc.Type())
306 encode(enc, reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface())
307 return enc.FlushValues()
308 }
309
310 func (b *BaseEncodingTestSuite) decodeTestData(e parquet.Encoding, buf []byte) {
311 dec := encoding.NewDecoder(testutils.TypeToParquetType(b.typ), e, b.descr, b.mem)
312 b.Equal(e, dec.Encoding())
313 b.Equal(b.descr.PhysicalType(), dec.Type())
314
315 dec.SetData(b.nvalues, buf)
316 decoded, _ := decode(dec, b.decodeBuf)
317 b.Equal(b.nvalues, decoded)
318 b.Equal(reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface(), reflect.ValueOf(b.decodeBuf).Slice(0, b.nvalues).Interface())
319 }
320
321 func (b *BaseEncodingTestSuite) encodeTestDataSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) (encoding.Buffer, error) {
322 enc := encoding.NewEncoder(testutils.TypeToParquetType(b.typ), e, false, b.descr, memory.DefaultAllocator)
323 encodeSpaced(enc, reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface(), validBits, validBitsOffset)
324 return enc.FlushValues()
325 }
326
327 func (b *BaseEncodingTestSuite) decodeTestDataSpaced(e parquet.Encoding, nullCount int, buf []byte, validBits []byte, validBitsOffset int64) {
328 dec := encoding.NewDecoder(testutils.TypeToParquetType(b.typ), e, b.descr, b.mem)
329 dec.SetData(b.nvalues-nullCount, buf)
330 decoded, _ := decodeSpaced(dec, b.decodeBuf, nullCount, validBits, validBitsOffset)
331 b.Equal(b.nvalues, decoded)
332
333 drawval := reflect.ValueOf(b.draws)
334 decodeval := reflect.ValueOf(b.decodeBuf)
335 for j := 0; j < b.nvalues; j++ {
336 if bitutil.BitIsSet(validBits, int(validBitsOffset)+j) {
337 b.Equal(drawval.Index(j).Interface(), decodeval.Index(j).Interface())
338 }
339 }
340 }
341
342 func (b *BaseEncodingTestSuite) checkRoundTrip(e parquet.Encoding) {
343 buf, _ := b.encodeTestData(e)
344 defer buf.Release()
345 b.decodeTestData(e, buf.Bytes())
346 }
347
348 func (b *BaseEncodingTestSuite) checkRoundTripSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) {
349 buf, _ := b.encodeTestDataSpaced(e, validBits, validBitsOffset)
350 defer buf.Release()
351
352 nullCount := 0
353 for i := 0; i < b.nvalues; i++ {
354 if bitutil.BitIsNotSet(validBits, int(validBitsOffset)+i) {
355 nullCount++
356 }
357 }
358 b.decodeTestDataSpaced(e, nullCount, buf.Bytes(), validBits, validBitsOffset)
359 }
360
361 func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
362 b.initData(10000, 1)
363 b.checkRoundTrip(parquet.Encodings.Plain)
364 }
365
366 func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
367 switch b.typ {
368 case reflect.TypeOf(true):
369 b.initData(2000, 200)
370 b.checkRoundTrip(parquet.Encodings.RLE)
371 default:
372 b.T().SkipNow()
373 }
374 }
375
376 func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
377 b.initData(10000, 1)
378
379 switch b.typ {
380 case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
381 b.checkRoundTrip(parquet.Encodings.DeltaBinaryPacked)
382 default:
383 b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaBinaryPacked) })
384 }
385 }
386
387 func (b *BaseEncodingTestSuite) TestDeltaLengthByteArrayRoundTrip() {
388 b.initData(10000, 1)
389
390 switch b.typ {
391 case reflect.TypeOf(parquet.ByteArray{}):
392 b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray)
393 default:
394 b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray) })
395 }
396 }
397
398 func (b *BaseEncodingTestSuite) TestDeltaByteArrayRoundTrip() {
399 b.initData(10000, 1)
400
401 switch b.typ {
402 case reflect.TypeOf(parquet.ByteArray{}):
403 b.checkRoundTrip(parquet.Encodings.DeltaByteArray)
404 default:
405 b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray) })
406 }
407 }
408
409 func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() {
410 exec := func(vals, repeats int, validBitsOffset int64, nullProb float64) {
411 b.Run(fmt.Sprintf("%d vals %d repeats %d offset %0.3f null", vals, repeats, validBitsOffset, 1-nullProb), func() {
412 b.initData(vals, repeats)
413
414 size := int64(b.nvalues) + validBitsOffset
415 r := testutils.NewRandomArrayGenerator(1923)
416 arr := r.Uint8(size, 0, 100, 1-nullProb)
417 validBits := arr.NullBitmapBytes()
418 if validBits != nil {
419 b.checkRoundTripSpaced(parquet.Encodings.Plain, validBits, validBitsOffset)
420 switch b.typ {
421 case reflect.TypeOf(false):
422 b.checkRoundTripSpaced(parquet.Encodings.RLE, validBits, validBitsOffset)
423 case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
424 b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits, validBitsOffset)
425 case reflect.TypeOf(parquet.ByteArray{}):
426 b.checkRoundTripSpaced(parquet.Encodings.DeltaLengthByteArray, validBits, validBitsOffset)
427 b.checkRoundTripSpaced(parquet.Encodings.DeltaByteArray, validBits, validBitsOffset)
428 }
429 }
430 })
431 }
432
433 const (
434 avx512Size = 64
435 simdSize = avx512Size
436 multiSimdSize = simdSize * 33
437 )
438
439 for _, nullProb := range []float64{0.001, 0.1, 0.5, 0.9, 0.999} {
440
441 for i := 1; i < simdSize*3; i++ {
442 exec(i, 1, 0, nullProb)
443 exec(i, 1, int64(i+1), nullProb)
444 }
445
446 exec(multiSimdSize, 1, 0, nullProb)
447 exec(multiSimdSize+33, 1, 0, nullProb)
448 exec(multiSimdSize, 1, 33, nullProb)
449 exec(multiSimdSize+33, 1, 33, nullProb)
450 }
451 }
452
453 func TestEncoding(t *testing.T) {
454 tests := []struct {
455 name string
456 typ reflect.Type
457 }{
458 {"Bool", reflect.TypeOf(true)},
459 {"Int32", reflect.TypeOf(int32(0))},
460 {"Int64", reflect.TypeOf(int64(0))},
461 {"Float32", reflect.TypeOf(float32(0))},
462 {"Float64", reflect.TypeOf(float64(0))},
463 {"Int96", reflect.TypeOf(parquet.Int96{})},
464 {"ByteArray", reflect.TypeOf(parquet.ByteArray{})},
465 {"FixedLenByteArray", reflect.TypeOf(parquet.FixedLenByteArray{})},
466 }
467
468 for _, tt := range tests {
469 t.Run(tt.name, func(t *testing.T) {
470 suite.Run(t, &BaseEncodingTestSuite{typ: tt.typ})
471 })
472 }
473 }
474
475 type DictionaryEncodingTestSuite struct {
476 BaseEncodingTestSuite
477 }
478
479 func (d *DictionaryEncodingTestSuite) encodeTestDataDict(e parquet.Encoding) (dictBuffer, indices encoding.Buffer, numEntries int) {
480 enc := encoding.NewEncoder(testutils.TypeToParquetType(d.typ), e, true, d.descr, memory.DefaultAllocator).(encoding.DictEncoder)
481
482 d.Equal(parquet.Encodings.PlainDict, enc.Encoding())
483 d.Equal(d.descr.PhysicalType(), enc.Type())
484 encode(enc, reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface())
485 dictBuffer = memory.NewResizableBuffer(d.mem)
486 dictBuffer.Resize(enc.DictEncodedSize())
487 enc.WriteDict(dictBuffer.Bytes())
488 indices, _ = enc.FlushValues()
489 numEntries = enc.NumEntries()
490 return
491 }
492
493 func (d *DictionaryEncodingTestSuite) encodeTestDataDictSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) (dictBuffer, indices encoding.Buffer, numEntries int) {
494 enc := encoding.NewEncoder(testutils.TypeToParquetType(d.typ), e, true, d.descr, memory.DefaultAllocator).(encoding.DictEncoder)
495 d.Equal(d.descr.PhysicalType(), enc.Type())
496
497 encodeSpaced(enc, reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), validBits, validBitsOffset)
498 dictBuffer = memory.NewResizableBuffer(d.mem)
499 dictBuffer.Resize(enc.DictEncodedSize())
500 enc.WriteDict(dictBuffer.Bytes())
501 indices, _ = enc.FlushValues()
502 numEntries = enc.NumEntries()
503 return
504 }
505
506 func (d *DictionaryEncodingTestSuite) checkRoundTrip() {
507 dictBuffer, indices, numEntries := d.encodeTestDataDict(parquet.Encodings.Plain)
508 defer dictBuffer.Release()
509 defer indices.Release()
510 validBits := make([]byte, int(bitutil.BytesForBits(int64(d.nvalues)))+1)
511 memory.Set(validBits, 255)
512
513 spacedBuffer, indicesSpaced, _ := d.encodeTestDataDictSpaced(parquet.Encodings.Plain, validBits, 0)
514 defer spacedBuffer.Release()
515 defer indicesSpaced.Release()
516 d.Equal(indices.Bytes(), indicesSpaced.Bytes())
517
518 dictDecoder := encoding.NewDecoder(testutils.TypeToParquetType(d.typ), parquet.Encodings.Plain, d.descr, d.mem)
519 d.Equal(d.descr.PhysicalType(), dictDecoder.Type())
520 dictDecoder.SetData(numEntries, dictBuffer.Bytes())
521 decoder := encoding.NewDictDecoder(testutils.TypeToParquetType(d.typ), d.descr, d.mem)
522 decoder.SetDict(dictDecoder)
523 decoder.SetData(d.nvalues, indices.Bytes())
524
525 decoded, _ := decode(decoder, d.decodeBuf)
526 d.Equal(d.nvalues, decoded)
527 d.Equal(reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), reflect.ValueOf(d.decodeBuf).Slice(0, d.nvalues).Interface())
528
529 decoder.SetData(d.nvalues, indices.Bytes())
530 decoded, _ = decodeSpaced(decoder, d.decodeBuf, 0, validBits, 0)
531 d.Equal(d.nvalues, decoded)
532 d.Equal(reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), reflect.ValueOf(d.decodeBuf).Slice(0, d.nvalues).Interface())
533 }
534
535 func (d *DictionaryEncodingTestSuite) TestBasicRoundTrip() {
536 d.initData(2500, 2)
537 d.checkRoundTrip()
538 }
539
540 func TestDictEncoding(t *testing.T) {
541 tests := []struct {
542 name string
543 typ reflect.Type
544 }{
545 {"Int32", reflect.TypeOf(int32(0))},
546 {"Int64", reflect.TypeOf(int64(0))},
547 {"Float32", reflect.TypeOf(float32(0))},
548 {"Float64", reflect.TypeOf(float64(0))},
549 {"ByteArray", reflect.TypeOf(parquet.ByteArray{})},
550 {"FixedLenByteArray", reflect.TypeOf(parquet.FixedLenByteArray{})},
551 }
552
553 for _, tt := range tests {
554 t.Run(tt.name, func(t *testing.T) {
555 suite.Run(t, &DictionaryEncodingTestSuite{BaseEncodingTestSuite{typ: tt.typ}})
556 })
557 }
558 }
559
560 func TestWriteDeltaBitPackedInt32(t *testing.T) {
561 column := schema.NewColumn(schema.NewInt32Node("int32", parquet.Repetitions.Required, -1), 0, 0)
562
563 tests := []struct {
564 name string
565 toencode []int32
566 expected []byte
567 }{
568 {"simple 12345", []int32{1, 2, 3, 4, 5}, []byte{128, 1, 4, 5, 2, 2, 0, 0, 0, 0}},
569 {"odd vals", []int32{7, 5, 3, 1, 2, 3, 4, 5}, []byte{128, 1, 4, 8, 14, 3, 2, 0, 0, 0, 192, 63, 0, 0, 0, 0, 0, 0}},
570 }
571
572 for _, tt := range tests {
573 t.Run(tt.name, func(t *testing.T) {
574 enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
575
576 enc.(encoding.Int32Encoder).Put(tt.toencode)
577 buf, _ := enc.FlushValues()
578 defer buf.Release()
579
580 assert.Equal(t, tt.expected, buf.Bytes())
581
582 dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
583
584 dec.(encoding.Int32Decoder).SetData(len(tt.toencode), tt.expected)
585 out := make([]int32, len(tt.toencode))
586 dec.(encoding.Int32Decoder).Decode(out)
587 assert.Equal(t, tt.toencode, out)
588 })
589 }
590
591 t.Run("test progressive decoding", func(t *testing.T) {
592 values := make([]int32, 1000)
593 testutils.FillRandomInt32(0, values)
594
595 enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
596 enc.(encoding.Int32Encoder).Put(values)
597 buf, _ := enc.FlushValues()
598 defer buf.Release()
599
600 dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
601 dec.(encoding.Int32Decoder).SetData(len(values), buf.Bytes())
602
603 valueBuf := make([]int32, 100)
604 for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
605 dec.(encoding.Int32Decoder).Decode(valueBuf)
606 assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
607 }
608 })
609
610 t.Run("test decoding multiple pages", func(t *testing.T) {
611 values := make([]int32, 1000)
612 testutils.FillRandomInt32(0, values)
613
614 enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
615 enc.(encoding.Int32Encoder).Put(values)
616 buf, _ := enc.FlushValues()
617 defer buf.Release()
618
619
620 dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
621 for i := 0; i < 5; i += 1 {
622 dec.(encoding.Int32Decoder).SetData(len(values), buf.Bytes())
623
624 valueBuf := make([]int32, 100)
625 for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
626 dec.(encoding.Int32Decoder).Decode(valueBuf)
627 assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
628 }
629 }
630 })
631 }
632
633 func TestWriteDeltaBitPackedInt64(t *testing.T) {
634 column := schema.NewColumn(schema.NewInt64Node("int64", parquet.Repetitions.Required, -1), 0, 0)
635
636 tests := []struct {
637 name string
638 toencode []int64
639 expected []byte
640 }{
641 {"simple 12345", []int64{1, 2, 3, 4, 5}, []byte{128, 1, 4, 5, 2, 2, 0, 0, 0, 0}},
642 {"odd vals", []int64{7, 5, 3, 1, 2, 3, 4, 5}, []byte{128, 1, 4, 8, 14, 3, 2, 0, 0, 0, 192, 63, 0, 0, 0, 0, 0, 0}},
643 }
644
645 for _, tt := range tests {
646 t.Run(tt.name, func(t *testing.T) {
647 enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
648
649 enc.(encoding.Int64Encoder).Put(tt.toencode)
650 buf, _ := enc.FlushValues()
651 defer buf.Release()
652
653 assert.Equal(t, tt.expected, buf.Bytes())
654
655 dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
656
657 dec.(encoding.Int64Decoder).SetData(len(tt.toencode), tt.expected)
658 out := make([]int64, len(tt.toencode))
659 dec.(encoding.Int64Decoder).Decode(out)
660 assert.Equal(t, tt.toencode, out)
661 })
662 }
663
664 t.Run("test progressive decoding", func(t *testing.T) {
665 values := make([]int64, 1000)
666 testutils.FillRandomInt64(0, values)
667
668 enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
669 enc.(encoding.Int64Encoder).Put(values)
670 buf, _ := enc.FlushValues()
671 defer buf.Release()
672
673 dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
674 dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
675
676 valueBuf := make([]int64, 100)
677 for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
678 decoded, _ := dec.(encoding.Int64Decoder).Decode(valueBuf)
679 assert.Equal(t, len(valueBuf), decoded)
680 assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
681 }
682 })
683
684 t.Run("GH-37102", func(t *testing.T) {
685 values := []int64{
686 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
687 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
688 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
689 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
690 0, 0,
691 }
692
693 enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
694 enc.(encoding.Int64Encoder).Put(values)
695 buf, _ := enc.FlushValues()
696 defer buf.Release()
697
698 dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
699 dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
700
701 valueBuf := make([]int64, len(values))
702
703 decoded, _ := dec.(encoding.Int64Decoder).Decode(valueBuf)
704 assert.Equal(t, len(valueBuf), decoded)
705 assert.Equal(t, values, valueBuf)
706 })
707
708 t.Run("test decoding multiple pages", func(t *testing.T) {
709 values := make([]int64, 1000)
710 testutils.FillRandomInt64(0, values)
711
712 enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
713 enc.(encoding.Int64Encoder).Put(values)
714 buf, _ := enc.FlushValues()
715 defer buf.Release()
716
717
718 dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
719 for i := 0; i < 5; i += 1 {
720 dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
721
722 valueBuf := make([]int64, 100)
723 for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
724 dec.(encoding.Int64Decoder).Decode(valueBuf)
725 assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
726 }
727 }
728 })
729 }
730
731 func TestDeltaLengthByteArrayEncoding(t *testing.T) {
732 column := schema.NewColumn(schema.NewByteArrayNode("bytearray", parquet.Repetitions.Required, -1), 0, 0)
733
734 test := []parquet.ByteArray{[]byte("Hello"), []byte("World"), []byte("Foobar"), []byte("ABCDEF")}
735 expected := []byte{128, 1, 4, 4, 10, 0, 1, 0, 0, 0, 2, 0, 0, 0, 72, 101, 108, 108, 111, 87, 111, 114, 108, 100, 70, 111, 111, 98, 97, 114, 65, 66, 67, 68, 69, 70}
736
737 enc := encoding.NewEncoder(parquet.Types.ByteArray, parquet.Encodings.DeltaLengthByteArray, false, column, memory.DefaultAllocator)
738 enc.(encoding.ByteArrayEncoder).Put(test)
739 buf, _ := enc.FlushValues()
740 defer buf.Release()
741
742 assert.Equal(t, expected, buf.Bytes())
743
744 dec := encoding.NewDecoder(parquet.Types.ByteArray, parquet.Encodings.DeltaLengthByteArray, column, nil)
745 dec.SetData(len(test), expected)
746 out := make([]parquet.ByteArray, len(test))
747 decoded, _ := dec.(encoding.ByteArrayDecoder).Decode(out)
748 assert.Equal(t, len(test), decoded)
749 assert.Equal(t, test, out)
750 }
751
752 func TestDeltaByteArrayEncoding(t *testing.T) {
753 test := []parquet.ByteArray{[]byte("Hello"), []byte("World"), []byte("Foobar"), []byte("ABCDEF")}
754 expected := []byte{128, 1, 4, 4, 0, 0, 0, 0, 0, 0, 128, 1, 4, 4, 10, 0, 1, 0, 0, 0, 2, 0, 0, 0, 72, 101, 108, 108, 111, 87, 111, 114, 108, 100, 70, 111, 111, 98, 97, 114, 65, 66, 67, 68, 69, 70}
755
756 enc := encoding.NewEncoder(parquet.Types.ByteArray, parquet.Encodings.DeltaByteArray, false, nil, nil)
757 enc.(encoding.ByteArrayEncoder).Put(test)
758 buf, _ := enc.FlushValues()
759 defer buf.Release()
760
761 assert.Equal(t, expected, buf.Bytes())
762
763 dec := encoding.NewDecoder(parquet.Types.ByteArray, parquet.Encodings.DeltaByteArray, nil, nil)
764 dec.SetData(len(test), expected)
765 out := make([]parquet.ByteArray, len(test))
766 decoded, _ := dec.(encoding.ByteArrayDecoder).Decode(out)
767 assert.Equal(t, len(test), decoded)
768 assert.Equal(t, test, out)
769 }
770
771 func TestDeltaBitPacking(t *testing.T) {
772 datadir := os.Getenv("ARROW_TEST_DATA")
773 if datadir == "" {
774 return
775 }
776
777 fname := path.Join(datadir, "parquet/timestamp.data")
778 require.FileExists(t, fname)
779 f, err := os.Open(fname)
780 if err != nil {
781 t.Fatal(err)
782 }
783 defer f.Close()
784
785 values := make([]int64, 0)
786
787 scanner := bufio.NewScanner(f)
788 for scanner.Scan() {
789 v, err := strconv.ParseInt(scanner.Text(), 10, 64)
790 if err != nil {
791 t.Fatal(err)
792 }
793 values = append(values, v)
794 }
795
796 if err := scanner.Err(); err != nil {
797 t.Fatal(err)
798 }
799
800 col := schema.NewColumn(schema.MustPrimitive(schema.NewPrimitiveNode("foo", parquet.Repetitions.Required,
801 parquet.Types.Int64, -1, -1)), 0, 0)
802 enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, col, memory.DefaultAllocator).(encoding.Int64Encoder)
803
804 enc.Put(values)
805 buf, err := enc.FlushValues()
806 if err != nil {
807 t.Fatal(err)
808 }
809 defer buf.Release()
810
811 dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, col, memory.DefaultAllocator).(encoding.Int64Decoder)
812 dec.SetData(len(values), buf.Bytes())
813
814 ll := len(values)
815 for i := 0; i < ll; i += 1024 {
816 out := make([]int64, 1024)
817 n, err := dec.Decode(out)
818 if err != nil {
819 t.Fatal(err)
820 }
821 assert.Equal(t, values[:n], out[:n])
822 values = values[n:]
823 }
824 assert.Equal(t, dec.ValuesLeft(), 0)
825 }
826
827 func TestBooleanPlainDecoderAfterFlushing(t *testing.T) {
828 descr := schema.NewColumn(schema.NewBooleanNode("bool", parquet.Repetitions.Optional, -1), 0, 0)
829 enc := encoding.NewEncoder(parquet.Types.Boolean, parquet.Encodings.Plain, false, descr, memory.DefaultAllocator)
830 benc := enc.(encoding.BooleanEncoder)
831
832 dec := encoding.NewDecoder(parquet.Types.Boolean, parquet.Encodings.Plain, descr, memory.DefaultAllocator)
833 decSlice := make([]bool, 1)
834 bdec := dec.(encoding.BooleanDecoder)
835
836
837
838
839 benc.Put([]bool{true})
840 buf1, err := benc.FlushValues()
841 assert.NoError(t, err)
842
843 benc.Put([]bool{false})
844 buf2, err := benc.FlushValues()
845 assert.NoError(t, err)
846
847
848 err = bdec.SetData(1, buf1.Buf())
849 assert.NoError(t, err)
850 n, err := bdec.Decode(decSlice)
851 assert.NoError(t, err)
852 assert.Equal(t, n, 1)
853 assert.Equal(t, decSlice[0], true)
854
855
856 err = bdec.SetData(1, buf2.Buf())
857 assert.NoError(t, err)
858 n, err = bdec.Decode(decSlice)
859 assert.NoError(t, err)
860 assert.Equal(t, n, 1)
861 assert.Equal(t, decSlice[0], false)
862 }
863
View as plain text