1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package utils_test
18
19 import (
20 "bytes"
21 "encoding/binary"
22 "fmt"
23 "math"
24 "math/bits"
25 "strconv"
26 "testing"
27
28 "github.com/apache/arrow/go/v15/arrow"
29 "github.com/apache/arrow/go/v15/arrow/array"
30 "github.com/apache/arrow/go/v15/arrow/bitutil"
31 "github.com/apache/arrow/go/v15/arrow/memory"
32 "github.com/apache/arrow/go/v15/parquet/internal/utils"
33 "github.com/stretchr/testify/assert"
34 "github.com/stretchr/testify/suite"
35 "golang.org/x/exp/rand"
36 "gonum.org/v1/gonum/stat/distuv"
37 )
38
39 func TestBitWriter(t *testing.T) {
40 buf := make([]byte, 8)
41 bw := utils.NewBitWriter(utils.NewWriterAtBuffer(buf))
42
43 for i := 0; i < 8; i++ {
44 assert.Nil(t, bw.WriteValue(uint64(i%2), 1))
45 }
46 bw.Flush(false)
47
48 assert.Equal(t, byte(0xAA), buf[0])
49
50 for i := 0; i < 8; i++ {
51 switch i {
52 case 0, 1, 4, 5:
53 assert.Nil(t, bw.WriteValue(0, 1))
54 default:
55 assert.Nil(t, bw.WriteValue(1, 1))
56 }
57 }
58 bw.Flush(false)
59
60 assert.Equal(t, byte(0xAA), buf[0])
61 assert.Equal(t, byte(0xCC), buf[1])
62 }
63
64 func TestBitReader(t *testing.T) {
65 buf := []byte{0xAA, 0xCC}
66
67 reader := utils.NewBitReader(bytes.NewReader(buf))
68 for i := 0; i < 8; i++ {
69 val, ok := reader.GetValue(1)
70 assert.True(t, ok)
71 assert.Equalf(t, (i%2) != 0, val != 0, "val: %d, i: %d", val, i)
72 }
73
74 for i := 0; i < 8; i++ {
75 val, ok := reader.GetValue(1)
76 assert.True(t, ok)
77 switch i {
78 case 0, 1, 4, 5:
79 assert.EqualValues(t, 0, val)
80 default:
81 assert.EqualValues(t, 1, val)
82 }
83 }
84 }
85
86 func TestBitArrayVals(t *testing.T) {
87 tests := []struct {
88 name string
89 nvals func(uint) int
90 }{
91 {"1 value", func(uint) int { return 1 }},
92 {"2 values", func(uint) int { return 2 }},
93 {"larger", func(w uint) int {
94 if w < 12 {
95 return 1 << w
96 }
97 return 4096
98 }},
99 {"1024 values", func(uint) int { return 1024 }},
100 }
101
102 for width := uint(1); width < 32; width++ {
103 t.Run(fmt.Sprintf("BitWriter Width %d", width), func(t *testing.T) {
104 for _, tt := range tests {
105 t.Run(tt.name, func(t *testing.T) {
106 var (
107 nvals = tt.nvals(width)
108 mod uint64 = 1
109 )
110 l := bitutil.BytesForBits(int64(int(width) * nvals))
111 assert.Greater(t, l, int64(0))
112
113 if width != 64 {
114 mod = uint64(1) << width
115 }
116
117 buf := make([]byte, l)
118 bw := utils.NewBitWriter(utils.NewWriterAtBuffer(buf))
119 for i := 0; i < nvals; i++ {
120 assert.Nil(t, bw.WriteValue(uint64(i)%mod, width))
121 }
122 bw.Flush(false)
123 assert.Equal(t, l, int64(bw.Written()))
124
125 br := utils.NewBitReader(bytes.NewReader(buf))
126 for i := 0; i < nvals; i++ {
127 val, ok := br.GetValue(int(width))
128 assert.True(t, ok)
129 assert.Equal(t, uint64(i)%mod, val)
130 }
131 })
132 }
133 })
134 }
135 }
136
137 func TestMixedValues(t *testing.T) {
138 const buflen = 1024
139 buf := make([]byte, buflen)
140 parity := true
141
142 bw := utils.NewBitWriter(utils.NewWriterAtBuffer(buf))
143 for i := 0; i < buflen; i++ {
144 if i%2 == 0 {
145 v := uint64(1)
146 if !parity {
147 v = 0
148 }
149 assert.Nil(t, bw.WriteValue(v, 1))
150 parity = !parity
151 } else {
152 assert.Nil(t, bw.WriteValue(uint64(i), 10))
153 }
154 }
155 bw.Flush(false)
156
157 parity = true
158 br := utils.NewBitReader(bytes.NewReader(buf))
159 for i := 0; i < buflen; i++ {
160 if i%2 == 0 {
161 val, ok := br.GetValue(1)
162 assert.True(t, ok)
163 exp := uint64(1)
164 if !parity {
165 exp = 0
166 }
167 assert.Equal(t, exp, val)
168 parity = !parity
169 } else {
170 val, ok := br.GetValue(10)
171 assert.True(t, ok)
172 assert.Equal(t, uint64(i), val)
173 }
174 }
175 }
176
177 func TestZigZag(t *testing.T) {
178 testvals := []struct {
179 val int64
180 exp [10]byte
181 }{
182 {0, [...]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}},
183 {1, [...]byte{2, 0, 0, 0, 0, 0, 0, 0, 0, 0}},
184 {1234, [...]byte{164, 19, 0, 0, 0, 0, 0, 0, 0, 0}},
185 {-1, [...]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0}},
186 {-1234, [...]byte{163, 19, 0, 0, 0, 0, 0, 0, 0, 0}},
187 {math.MaxInt32, [...]byte{254, 255, 255, 255, 15, 0, 0, 0, 0, 0}},
188 {-math.MaxInt32, [...]byte{253, 255, 255, 255, 15, 0, 0, 0, 0, 0}},
189 {math.MinInt32, [...]byte{255, 255, 255, 255, 15, 0, 0, 0, 0, 0}},
190 {math.MaxInt64, [...]byte{254, 255, 255, 255, 255, 255, 255, 255, 255, 1}},
191 {-math.MaxInt64, [...]byte{253, 255, 255, 255, 255, 255, 255, 255, 255, 1}},
192 {math.MinInt64, [...]byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 1}},
193 }
194
195 for _, v := range testvals {
196 t.Run(strconv.Itoa(int(v.val)), func(t *testing.T) {
197 var buf [binary.MaxVarintLen64]byte
198 wrtr := utils.NewBitWriter(utils.NewWriterAtBuffer(buf[:]))
199 assert.True(t, wrtr.WriteZigZagVlqInt(v.val))
200 wrtr.Flush(false)
201
202 assert.Equal(t, v.exp, buf)
203
204 rdr := utils.NewBitReader(bytes.NewReader(buf[:]))
205 val, ok := rdr.GetZigZagVlqInt()
206 assert.True(t, ok)
207 assert.EqualValues(t, v.val, val)
208 })
209 }
210 }
211
212 const buflen = 1024
213
214 type RLETestSuite struct {
215 suite.Suite
216
217 expectedBuf []byte
218 values []uint64
219 }
220
221 type RLERandomSuite struct {
222 suite.Suite
223 }
224
225 func TestRLE(t *testing.T) {
226 suite.Run(t, new(RLETestSuite))
227 }
228
229 func TestRleRandom(t *testing.T) {
230 suite.Run(t, new(RLERandomSuite))
231 }
232
233 func (r *RLETestSuite) ValidateRle(vals []uint64, width int, expected []byte, explen int) {
234 const buflen = 64 * 1024
235 buf := make([]byte, buflen)
236
237 r.Run("test encode", func() {
238 r.LessOrEqual(explen, buflen)
239
240 enc := utils.NewRleEncoder(utils.NewWriterAtBuffer(buf), width)
241 for _, val := range vals {
242 r.NoError(enc.Put(val))
243 }
244 encoded := enc.Flush()
245 if explen != -1 {
246 r.Equal(explen, encoded)
247 }
248
249 if expected != nil {
250 r.Equal(expected, buf[:encoded])
251 }
252 })
253
254 r.Run("decode read", func() {
255 dec := utils.NewRleDecoder(bytes.NewReader(buf), width)
256 for _, val := range vals {
257 v, ok := dec.GetValue()
258 r.True(ok)
259 r.Equal(val, v)
260 }
261 })
262
263 r.Run("decode batch read", func() {
264 dec := utils.NewRleDecoder(bytes.NewReader(buf), width)
265 check := make([]uint64, len(vals))
266 r.Equal(len(vals), dec.GetBatch(check))
267 r.Equal(vals, check)
268 })
269 }
270
271 func (r *RLETestSuite) SetupTest() {
272 r.expectedBuf = make([]byte, 0, buflen)
273 r.values = make([]uint64, 100)
274 }
275
276 func (r *RLETestSuite) Test50Zeros50Ones() {
277 for i := 0; i < 50; i++ {
278 r.values[i] = 0
279 }
280 for i := 50; i < 100; i++ {
281 r.values[i] = 1
282 }
283
284 r.expectedBuf = append(r.expectedBuf, []byte{50 << 1, 0, 50 << 1, 1}...)
285 for width := 1; width <= 8; width++ {
286 r.Run(fmt.Sprintf("bitwidth: %d", width), func() {
287 r.ValidateRle(r.values, width, r.expectedBuf, 4)
288 })
289 }
290
291 for width := 9; width <= 32; width++ {
292 r.Run(fmt.Sprintf("bitwidth: %d", width), func() {
293 r.ValidateRle(r.values, width, nil, int(2*(1+bitutil.BytesForBits(int64(width)))))
294 })
295 }
296 }
297
298 func (r *RLETestSuite) Test100ZerosOnesAlternating() {
299 for idx := range r.values {
300 r.values[idx] = uint64(idx % 2)
301 }
302
303 ngroups := bitutil.BytesForBits(100)
304 r.expectedBuf = r.expectedBuf[:ngroups+1]
305 r.expectedBuf[0] = byte(ngroups<<1) | 1
306 for i := 1; i <= 100/8; i++ {
307 r.expectedBuf[i] = 0xAA
308 }
309 r.expectedBuf[100/8+1] = 0x0A
310
311 r.Run("width: 1", func() {
312 r.ValidateRle(r.values, 1, r.expectedBuf, int(1+ngroups))
313 })
314 for width := 2; width < 32; width++ {
315 r.Run(fmt.Sprintf("width: %d", width), func() {
316 nvalues := bitutil.BytesForBits(100) * 8
317 r.ValidateRle(r.values, width, nil, int(1+bitutil.BytesForBits(int64(width)*nvalues)))
318 })
319 }
320 }
321
322 func (r *RLETestSuite) Test16BitValues() {
323
324 r.values = r.values[:28]
325 for i := 0; i < 16; i++ {
326 r.values[i] = 0x55aa
327 }
328 for i := 16; i < 28; i++ {
329 r.values[i] = 0xaa55
330 }
331
332 r.expectedBuf = append(r.expectedBuf, []byte{
333 16 << 1, 0xaa, 0x55, 12 << 1, 0x55, 0xaa,
334 }...)
335
336 r.ValidateRle(r.values, 16, r.expectedBuf, 6)
337 }
338
339 func (r *RLETestSuite) Test32BitValues() {
340
341 r.values = r.values[:28]
342 for i := 0; i < 16; i++ {
343 r.values[i] = 0x555aaaa5
344 }
345 for i := 16; i < 28; i++ {
346 r.values[i] = 0x5aaaa555
347 }
348
349 r.expectedBuf = append(r.expectedBuf, []byte{
350 16 << 1, 0xa5, 0xaa, 0x5a, 0x55,
351 12 << 1, 0x55, 0xa5, 0xaa, 0x5a,
352 }...)
353
354 r.ValidateRle(r.values, 32, r.expectedBuf, 10)
355 }
356
357 func (r *RLETestSuite) TestRleValues() {
358 tests := []struct {
359 name string
360 nvals int
361 val int
362 }{
363 {"1", 1, -1},
364 {"1024", 1024, -1},
365 {"1024 0", 1024, 0},
366 {"1024 1", 1024, 1},
367 }
368
369 for width := 1; width <= 32; width++ {
370 r.Run(fmt.Sprintf("width %d", width), func() {
371 for _, tt := range tests {
372 r.Run(tt.name, func() {
373
374 var mod uint64 = 1
375 if width != 64 {
376 mod = uint64(1) << width
377 }
378
379 r.values = r.values[:0]
380
381 for v := 0; v < tt.nvals; v++ {
382 if tt.val != -1 {
383 r.values = append(r.values, uint64(tt.val))
384 } else {
385 r.values = append(r.values, uint64(v)%mod)
386 }
387 }
388 r.ValidateRle(r.values, width, nil, -1)
389 })
390 }
391 })
392 }
393 }
394
395
396
397 func (r *RLETestSuite) TestBitRleFlush() {
398 vals := make([]uint64, 0, 16)
399 for i := 0; i < 16; i++ {
400 vals = append(vals, 1)
401 }
402 vals = append(vals, 0)
403 r.ValidateRle(vals, 1, nil, -1)
404 vals = append(vals, 1)
405 r.ValidateRle(vals, 1, nil, -1)
406 vals = append(vals, 1)
407 r.ValidateRle(vals, 1, nil, -1)
408 vals = append(vals, 1)
409 r.ValidateRle(vals, 1, nil, -1)
410 }
411
412 func (r *RLETestSuite) TestRepeatedPattern() {
413 r.values = r.values[:0]
414 const minrun = 1
415 const maxrun = 32
416
417 for i := minrun; i <= maxrun; i++ {
418 v := i % 2
419 for j := 0; j < i; j++ {
420 r.values = append(r.values, uint64(v))
421 }
422 }
423
424
425 for i := maxrun; i >= minrun; i-- {
426 v := i % 2
427 for j := 0; j < i; j++ {
428 r.values = append(r.values, uint64(v))
429 }
430 }
431
432 r.ValidateRle(r.values, 1, nil, -1)
433 }
434
435 func TestBitWidthZeroRepeated(t *testing.T) {
436 buf := make([]byte, 1)
437 const nvals = 15
438 buf[0] = nvals << 1
439 dec := utils.NewRleDecoder(bytes.NewReader(buf), 0)
440 for i := 0; i < nvals; i++ {
441 val, ok := dec.GetValue()
442 assert.True(t, ok)
443 assert.Zero(t, val)
444 }
445 _, ok := dec.GetValue()
446 assert.False(t, ok)
447 }
448
449 func TestBitWidthZeroLiteral(t *testing.T) {
450 const ngroups = 4
451 buf := []byte{4<<1 | 1}
452 dec := utils.NewRleDecoder(bytes.NewReader(buf), 0)
453 const nvals = ngroups * 8
454 for i := 0; i < nvals; i++ {
455 val, ok := dec.GetValue()
456 assert.True(t, ok)
457 assert.Zero(t, val)
458 }
459 _, ok := dec.GetValue()
460 assert.False(t, ok)
461 }
462
463 func (r *RLERandomSuite) checkRoundTrip(vals []uint64, width int) bool {
464 const buflen = 64 * 1024
465 buf := make([]byte, buflen)
466 var encoded int
467
468 res := r.Run("encode values", func() {
469 enc := utils.NewRleEncoder(utils.NewWriterAtBuffer(buf), width)
470 for idx, val := range vals {
471 r.Require().NoErrorf(enc.Put(val), "encoding idx: %d", idx)
472 }
473 encoded = enc.Flush()
474 })
475
476 res = res && r.Run("decode individual", func() {
477 dec := utils.NewRleDecoder(bytes.NewReader(buf[:encoded]), width)
478 for idx, val := range vals {
479 out, ok := dec.GetValue()
480 r.True(ok)
481 r.Require().Equalf(out, val, "mismatch idx: %d", idx)
482 }
483 })
484
485 res = res && r.Run("batch decode", func() {
486 dec := utils.NewRleDecoder(bytes.NewReader(buf[:encoded]), width)
487 read := make([]uint64, len(vals))
488 r.Require().Equal(len(vals), dec.GetBatch(read))
489 r.Equal(vals, read)
490 })
491
492 return res
493 }
494
495 func (r *RLERandomSuite) checkRoundTripSpaced(vals arrow.Array, width int) {
496 nvalues := vals.Len()
497 bufsize := utils.MaxRLEBufferSize(width, nvalues)
498
499 buffer := make([]byte, bufsize)
500 encoder := utils.NewRleEncoder(utils.NewWriterAtBuffer(buffer), width)
501
502 switch v := vals.(type) {
503 case *array.Int32:
504 for i := 0; i < v.Len(); i++ {
505 if v.IsValid(i) {
506 r.Require().NoError(encoder.Put(uint64(v.Value(i))))
507 }
508 }
509 }
510
511 encodedSize := encoder.Flush()
512
513
514 decoder := utils.NewRleDecoder(bytes.NewReader(buffer[:encodedSize]), width)
515 valuesRead := make([]uint64, nvalues)
516 val, err := decoder.GetBatchSpaced(valuesRead, vals.NullN(), vals.NullBitmapBytes(), int64(vals.Data().Offset()))
517 r.NoError(err)
518 r.EqualValues(nvalues, val)
519
520 switch v := vals.(type) {
521 case *array.Int32:
522 for i := 0; i < nvalues; i++ {
523 if vals.IsValid(i) {
524 r.EqualValues(v.Value(i), valuesRead[i])
525 }
526 }
527 }
528 }
529
530 func (r *RLERandomSuite) TestRandomSequences() {
531 const niters = 50
532 const ngroups = 1000
533 const maxgroup = 16
534
535 values := make([]uint64, ngroups+maxgroup)
536 seed := rand.Uint64() ^ (rand.Uint64() << 32)
537 gen := rand.New(rand.NewSource(seed))
538
539 for itr := 0; itr < niters; itr++ {
540 parity := false
541 values = values[:0]
542
543 for i := 0; i < ngroups; i++ {
544 groupsize := gen.Intn(19) + 1
545 if groupsize > maxgroup {
546 groupsize = 1
547 }
548
549 v := uint64(0)
550 if parity {
551 v = 1
552 }
553 for j := 0; j < groupsize; j++ {
554 values = append(values, v)
555 }
556 parity = !parity
557 }
558 r.Require().Truef(r.checkRoundTrip(values, bits.Len(uint(len(values)))), "failing seed: %d", seed)
559 }
560 }
561
562 type RandomArrayGenerator struct {
563 seed uint64
564 extra uint64
565 src rand.Source
566 seedRand *rand.Rand
567 }
568
569 func NewRandomArrayGenerator(seed uint64) RandomArrayGenerator {
570 src := rand.NewSource(seed)
571 return RandomArrayGenerator{seed, 0, src, rand.New(src)}
572 }
573
574 func (r *RandomArrayGenerator) generateBitmap(buffer []byte, n int64, prob float64) int64 {
575 count := int64(0)
576 r.extra++
577
578 dist := distuv.Bernoulli{P: prob, Src: rand.NewSource(r.seed + r.extra)}
579 for i := int(0); int64(i) < n; i++ {
580 if dist.Rand() != float64(0.0) {
581 bitutil.SetBit(buffer, i)
582 } else {
583 count++
584 }
585 }
586
587 return count
588 }
589
590 func (r *RandomArrayGenerator) Int32(size int64, min, max int32, prob float64) arrow.Array {
591 buffers := make([]*memory.Buffer, 2)
592 nullCount := int64(0)
593
594 buffers[0] = memory.NewResizableBuffer(memory.DefaultAllocator)
595 buffers[0].Resize(int(bitutil.BytesForBits(size)))
596 nullCount = r.generateBitmap(buffers[0].Bytes(), size, prob)
597
598 buffers[1] = memory.NewResizableBuffer(memory.DefaultAllocator)
599 buffers[1].Resize(int(size * int64(arrow.Int32SizeBytes)))
600
601 r.extra++
602 dist := rand.New(rand.NewSource(r.seed + r.extra))
603 out := arrow.Int32Traits.CastFromBytes(buffers[1].Bytes())
604 for i := int64(0); i < size; i++ {
605 out[i] = int32(dist.Int31n(max-min+1)) + min
606 }
607
608 return array.NewInt32Data(array.NewData(arrow.PrimitiveTypes.Int32, int(size), buffers, nil, int(nullCount), 0))
609 }
610
611 func (r *RLERandomSuite) TestGetBatchSpaced() {
612 seed := uint64(1337)
613
614 rng := NewRandomArrayGenerator(seed)
615
616 tests := []struct {
617 name string
618 max int32
619 size int64
620 nullProb float64
621 bitWidth int
622 }{
623 {"all ones 0.01 nullprob width 1", 1, 100000, 0.01, 1},
624 {"all ones 0.1 nullprob width 1", 1, 100000, 0.1, 1},
625 {"all ones 0.5 nullprob width 1", 1, 100000, 0.5, 1},
626 {"max 4 0.05 nullprob width 3", 4, 100000, 0.05, 3},
627 {"max 100 0.05 nullprob width 7", 100, 100000, 0.05, 7},
628 }
629
630 for _, tt := range tests {
631 r.Run(tt.name, func() {
632 arr := rng.Int32(tt.size, 0, tt.max, tt.nullProb)
633 r.checkRoundTripSpaced(arr, tt.bitWidth)
634 r.checkRoundTripSpaced(array.NewSlice(arr, 1, int64(arr.Len())), tt.bitWidth)
635 })
636 }
637 }
638
View as plain text