1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package utils
21
22 import (
23 "bytes"
24 "encoding/binary"
25 "math"
26
27 "github.com/apache/arrow/go/v15/arrow/bitutil"
28 "github.com/apache/arrow/go/v15/internal/bitutils"
29 "github.com/apache/arrow/go/v15/internal/utils"
30 "github.com/apache/arrow/go/v15/parquet"
31 "golang.org/x/xerrors"
32 )
33
34
35
36 const (
37 MaxValuesPerLiteralRun = (1 << 6) * 8
38 )
39
40 func MinRLEBufferSize(bitWidth int) int {
41 maxLiteralRunSize := 1 + bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth))
42 maxRepeatedRunSize := binary.MaxVarintLen32 + bitutil.BytesForBits(int64(bitWidth))
43 return int(utils.Max(maxLiteralRunSize, maxRepeatedRunSize))
44 }
45
46 func MaxRLEBufferSize(width, numValues int) int {
47 bytesPerRun := width
48 numRuns := int(bitutil.BytesForBits(int64(numValues)))
49 literalMaxSize := numRuns + (numRuns * bytesPerRun)
50
51 minRepeatedRunSize := 1 + int(bitutil.BytesForBits(int64(width)))
52 repeatedMaxSize := int(bitutil.BytesForBits(int64(numValues))) * minRepeatedRunSize
53
54 return utils.Max(literalMaxSize, repeatedMaxSize)
55 }
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 type RleDecoder struct {
107 r *BitReader
108
109 bitWidth int
110 curVal uint64
111 repCount int32
112 litCount int32
113 }
114
115 func NewRleDecoder(data *bytes.Reader, width int) *RleDecoder {
116 return &RleDecoder{r: NewBitReader(data), bitWidth: width}
117 }
118
119 func (r *RleDecoder) Reset(data *bytes.Reader, width int) {
120 r.bitWidth = width
121 r.curVal = 0
122 r.repCount = 0
123 r.litCount = 0
124 r.r.Reset(data)
125 }
126
127 func (r *RleDecoder) Next() bool {
128 indicator, ok := r.r.GetVlqInt()
129 if !ok {
130 return false
131 }
132
133 literal := (indicator & 1) != 0
134 count := uint32(indicator >> 1)
135 if literal {
136 if count == 0 || count > uint32(math.MaxInt32/8) {
137 return false
138 }
139 r.litCount = int32(count) * 8
140 } else {
141 if count == 0 || count > uint32(math.MaxInt32) {
142 return false
143 }
144 r.repCount = int32(count)
145
146 nbytes := int(bitutil.BytesForBits(int64(r.bitWidth)))
147 switch {
148 case nbytes > 4:
149 if !r.r.GetAligned(nbytes, &r.curVal) {
150 return false
151 }
152 case nbytes > 2:
153 var val uint32
154 if !r.r.GetAligned(nbytes, &val) {
155 return false
156 }
157 r.curVal = uint64(val)
158 case nbytes > 1:
159 var val uint16
160 if !r.r.GetAligned(nbytes, &val) {
161 return false
162 }
163 r.curVal = uint64(val)
164 default:
165 var val uint8
166 if !r.r.GetAligned(nbytes, &val) {
167 return false
168 }
169 r.curVal = uint64(val)
170 }
171 }
172 return true
173 }
174
175 func (r *RleDecoder) GetValue() (uint64, bool) {
176 vals := make([]uint64, 1)
177 n := r.GetBatch(vals)
178 return vals[0], n == 1
179 }
180
181 func (r *RleDecoder) GetBatch(values []uint64) int {
182 read := 0
183 size := len(values)
184
185 out := values
186 for read < size {
187 remain := size - read
188
189 if r.repCount > 0 {
190 repbatch := int(math.Min(float64(remain), float64(r.repCount)))
191 for i := 0; i < repbatch; i++ {
192 out[i] = r.curVal
193 }
194
195 r.repCount -= int32(repbatch)
196 read += repbatch
197 out = out[repbatch:]
198 } else if r.litCount > 0 {
199 litbatch := int(math.Min(float64(remain), float64(r.litCount)))
200 n, _ := r.r.GetBatch(uint(r.bitWidth), out[:litbatch])
201 if n != litbatch {
202 return read
203 }
204
205 r.litCount -= int32(litbatch)
206 read += litbatch
207 out = out[litbatch:]
208 } else {
209 if !r.Next() {
210 return read
211 }
212 }
213 }
214 return read
215 }
216
217 func (r *RleDecoder) GetBatchSpaced(vals []uint64, nullcount int, validBits []byte, validBitsOffset int64) (int, error) {
218 if nullcount == 0 {
219 return r.GetBatch(vals), nil
220 }
221
222 converter := plainConverter{}
223 blockCounter := bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
224
225 var (
226 totalProcessed int
227 processed int
228 block bitutils.BitBlockCount
229 err error
230 )
231
232 for {
233 block = blockCounter.NextFourWords()
234 if block.Len == 0 {
235 break
236 }
237
238 if block.AllSet() {
239 processed = r.GetBatch(vals[:block.Len])
240 } else if block.NoneSet() {
241 converter.FillZero(vals[:block.Len])
242 processed = int(block.Len)
243 } else {
244 processed, err = r.getspaced(converter, vals, int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset)
245 if err != nil {
246 return totalProcessed, err
247 }
248 }
249
250 totalProcessed += processed
251 vals = vals[int(block.Len):]
252 validBitsOffset += int64(block.Len)
253
254 if processed != int(block.Len) {
255 break
256 }
257 }
258 return totalProcessed, nil
259 }
260
261 func (r *RleDecoder) getspaced(dc DictionaryConverter, vals interface{}, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
262 switch vals := vals.(type) {
263 case []int32:
264 return r.getspacedInt32(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
265 case []int64:
266 return r.getspacedInt64(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
267 case []float32:
268 return r.getspacedFloat32(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
269 case []float64:
270 return r.getspacedFloat64(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
271 case []parquet.ByteArray:
272 return r.getspacedByteArray(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
273 case []parquet.FixedLenByteArray:
274 return r.getspacedFixedLenByteArray(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
275 case []parquet.Int96:
276 return r.getspacedInt96(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
277 case []uint64:
278 return r.getspacedUint64(dc, vals, batchSize, nullCount, validBits, validBitsOffset)
279 default:
280 return 0, xerrors.New("parquet/rle: getspaced invalid type")
281 }
282 }
283
284 func (r *RleDecoder) getspacedUint64(dc DictionaryConverter, vals []uint64, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
285 if nullCount == batchSize {
286 dc.FillZero(vals[:batchSize])
287 return batchSize, nil
288 }
289
290 read := 0
291 remain := batchSize - nullCount
292
293 const bufferSize = 1024
294 var indexbuffer [bufferSize]IndexType
295
296
297 bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
298 validRun := bitReader.NextRun()
299 for read < batchSize {
300 if validRun.Len == 0 {
301 validRun = bitReader.NextRun()
302 }
303
304 if !validRun.Set {
305 dc.FillZero(vals[:int(validRun.Len)])
306 vals = vals[int(validRun.Len):]
307 read += int(validRun.Len)
308 validRun.Len = 0
309 continue
310 }
311
312 if r.repCount == 0 && r.litCount == 0 {
313 if !r.Next() {
314 return read, nil
315 }
316 }
317
318 var batch int
319 switch {
320 case r.repCount > 0:
321 batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
322 current := IndexType(r.curVal)
323 if !dc.IsValid(current) {
324 return read, nil
325 }
326 dc.Fill(vals[:batch], current)
327 case r.litCount > 0:
328 var (
329 litread int
330 skipped int
331 err error
332 )
333 litread, skipped, validRun, err = r.consumeLiteralsUint64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
334 if err != nil {
335 return read, err
336 }
337 batch = litread + skipped
338 remain -= litread
339 }
340
341 vals = vals[batch:]
342 read += batch
343 }
344 return read, nil
345 }
346
347 func (r *RleDecoder) consumeRepeatCounts(read, batchSize, remain int, run bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun) {
348
349
350
351
352 repeatBatch := 0
353 for r.repCount > 0 && (read+repeatBatch) < batchSize {
354 if run.Set {
355 updateSize := int(utils.Min(run.Len, int64(r.repCount)))
356 r.repCount -= int32(updateSize)
357 repeatBatch += updateSize
358 run.Len -= int64(updateSize)
359 remain -= updateSize
360 } else {
361 repeatBatch += int(run.Len)
362 run.Len = 0
363 }
364
365 if run.Len == 0 {
366 run = bitRdr.NextRun()
367 }
368 }
369 return repeatBatch, remain, run
370 }
371
372 func (r *RleDecoder) consumeLiteralsUint64(dc DictionaryConverter, vals []uint64, remain int, buf []IndexType, run bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
373 batch := utils.Min(utils.Min(remain, int(r.litCount)), len(buf))
374 buf = buf[:batch]
375
376 n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
377 if n != batch {
378 return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
379 }
380
381 if !dc.IsValid(buf...) {
382 return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
383 }
384
385 var (
386 read int
387 skipped int
388 )
389 for read < batch {
390 if run.Set {
391 updateSize := utils.Min(batch-read, int(run.Len))
392 if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
393 return 0, 0, run, err
394 }
395 read += updateSize
396 vals = vals[updateSize:]
397 run.Len -= int64(updateSize)
398 } else {
399 dc.FillZero(vals[:int(run.Len)])
400 vals = vals[int(run.Len):]
401 skipped += int(run.Len)
402 run.Len = 0
403 }
404 if run.Len == 0 {
405 run = bitRdr.NextRun()
406 }
407 }
408 r.litCount -= int32(batch)
409 return read, skipped, run, nil
410 }
411
412 func (r *RleDecoder) GetBatchWithDict(dc DictionaryConverter, vals interface{}) (int, error) {
413 switch vals := vals.(type) {
414 case []int32:
415 return r.GetBatchWithDictInt32(dc, vals)
416 case []int64:
417 return r.GetBatchWithDictInt64(dc, vals)
418 case []float32:
419 return r.GetBatchWithDictFloat32(dc, vals)
420 case []float64:
421 return r.GetBatchWithDictFloat64(dc, vals)
422 case []parquet.ByteArray:
423 return r.GetBatchWithDictByteArray(dc, vals)
424 case []parquet.FixedLenByteArray:
425 return r.GetBatchWithDictFixedLenByteArray(dc, vals)
426 case []parquet.Int96:
427 return r.GetBatchWithDictInt96(dc, vals)
428 default:
429 return 0, xerrors.New("parquet/rle: GetBatchWithDict invalid type")
430 }
431 }
432
433 func (r *RleDecoder) GetBatchWithDictSpaced(dc DictionaryConverter, vals interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
434 switch vals := vals.(type) {
435 case []int32:
436 return r.GetBatchWithDictSpacedInt32(dc, vals, nullCount, validBits, validBitsOffset)
437 case []int64:
438 return r.GetBatchWithDictSpacedInt64(dc, vals, nullCount, validBits, validBitsOffset)
439 case []float32:
440 return r.GetBatchWithDictSpacedFloat32(dc, vals, nullCount, validBits, validBitsOffset)
441 case []float64:
442 return r.GetBatchWithDictSpacedFloat64(dc, vals, nullCount, validBits, validBitsOffset)
443 case []parquet.ByteArray:
444 return r.GetBatchWithDictSpacedByteArray(dc, vals, nullCount, validBits, validBitsOffset)
445 case []parquet.FixedLenByteArray:
446 return r.GetBatchWithDictSpacedFixedLenByteArray(dc, vals, nullCount, validBits, validBitsOffset)
447 case []parquet.Int96:
448 return r.GetBatchWithDictSpacedInt96(dc, vals, nullCount, validBits, validBitsOffset)
449 default:
450 return 0, xerrors.New("parquet/rle: GetBatchWithDictSpaced invalid type")
451 }
452 }
453
454 type RleEncoder struct {
455 w *BitWriter
456
457 buffer []uint64
458 BitWidth int
459 curVal uint64
460 repCount int32
461 litCount int32
462 literalIndicatorOffset int
463
464 indicatorBuffer [1]byte
465 }
466
467 func NewRleEncoder(w WriterAtWithLen, width int) *RleEncoder {
468 return &RleEncoder{
469 w: NewBitWriter(w),
470 buffer: make([]uint64, 0, 8),
471 BitWidth: width,
472 literalIndicatorOffset: -1,
473 }
474 }
475
476 func (r *RleEncoder) Flush() int {
477 if r.litCount > 0 || r.repCount > 0 || len(r.buffer) > 0 {
478 allRep := r.litCount == 0 && (r.repCount == int32(len(r.buffer)) || len(r.buffer) == 0)
479 if r.repCount > 0 && allRep {
480 r.flushRepeated()
481 } else {
482
483 for len(r.buffer) != 0 && len(r.buffer) < 8 {
484 r.buffer = append(r.buffer, 0)
485 }
486
487 r.litCount += int32(len(r.buffer))
488 r.flushLiteral(true)
489 r.repCount = 0
490 }
491 }
492 r.w.Flush(false)
493 return r.w.Written()
494 }
495
496 func (r *RleEncoder) flushBuffered(done bool) (err error) {
497 if r.repCount >= 8 {
498
499
500 r.buffer = r.buffer[:0]
501 if r.litCount != 0 {
502
503 err = r.flushLiteral(true)
504 }
505 return
506 }
507
508 r.litCount += int32(len(r.buffer))
509 ngroups := r.litCount / 8
510 if ngroups+1 >= (1 << 6) {
511
512
513 err = r.flushLiteral(true)
514 } else {
515 err = r.flushLiteral(done)
516 }
517 r.repCount = 0
518 return
519 }
520
521 func (r *RleEncoder) flushLiteral(updateIndicator bool) (err error) {
522 if r.literalIndicatorOffset == -1 {
523 r.literalIndicatorOffset, err = r.w.SkipBytes(1)
524 if err != nil {
525 return
526 }
527 }
528
529 for _, val := range r.buffer {
530 if err = r.w.WriteValue(val, uint(r.BitWidth)); err != nil {
531 return
532 }
533 }
534 r.buffer = r.buffer[:0]
535
536 if updateIndicator {
537
538
539
540 ngroups := r.litCount / 8
541 r.indicatorBuffer[0] = byte((ngroups << 1) | 1)
542 _, err = r.w.WriteAt(r.indicatorBuffer[:], int64(r.literalIndicatorOffset))
543 r.literalIndicatorOffset = -1
544 r.litCount = 0
545 }
546 return
547 }
548
549 func (r *RleEncoder) flushRepeated() (ret bool) {
550 indicator := r.repCount << 1
551
552 ret = r.w.WriteVlqInt(uint64(indicator))
553 ret = ret && r.w.WriteAligned(r.curVal, int(bitutil.BytesForBits(int64(r.BitWidth))))
554
555 r.repCount = 0
556 r.buffer = r.buffer[:0]
557 return
558 }
559
560
561
562 func (r *RleEncoder) Put(value uint64) error {
563 if r.curVal == value {
564 r.repCount++
565 if r.repCount > 8 {
566
567
568 return nil
569 }
570 } else {
571 if r.repCount >= 8 {
572 if !r.flushRepeated() {
573 return xerrors.New("failed to flush repeated value")
574 }
575 }
576 r.repCount = 1
577 r.curVal = value
578 }
579
580 r.buffer = append(r.buffer, value)
581 if len(r.buffer) == 8 {
582 return r.flushBuffered(false)
583 }
584 return nil
585 }
586
587 func (r *RleEncoder) Clear() {
588 r.curVal = 0
589 r.repCount = 0
590 r.buffer = r.buffer[:0]
591 r.litCount = 0
592 r.literalIndicatorOffset = -1
593 r.w.Clear()
594 }
595
View as plain text