1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "bytes"
21 "errors"
22 "math"
23 "math/bits"
24 "reflect"
25
26 "github.com/apache/arrow/go/v15/arrow"
27 "github.com/apache/arrow/go/v15/arrow/memory"
28 shared_utils "github.com/apache/arrow/go/v15/internal/utils"
29 "github.com/apache/arrow/go/v15/parquet"
30 "github.com/apache/arrow/go/v15/parquet/internal/utils"
31 )
32
33
34
35 type deltaBitPackDecoder struct {
36 decoder
37
38 mem memory.Allocator
39
40 usedFirst bool
41 bitdecoder *utils.BitReader
42 blockSize uint64
43 currentBlockVals uint32
44 miniBlocksPerBlock uint64
45 valsPerMini uint32
46 currentMiniBlockVals uint32
47 minDelta int64
48 miniBlockIdx uint64
49
50 deltaBitWidths *memory.Buffer
51 deltaBitWidth byte
52
53 totalValues uint64
54 lastVal int64
55 }
56
57
58 func (d *deltaBitPackDecoder) bytesRead() int64 {
59 return d.bitdecoder.CurOffset()
60 }
61
62 func (d *deltaBitPackDecoder) Allocator() memory.Allocator { return d.mem }
63
64
65
66 func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error {
67
68 if err := d.decoder.SetData(nvalues, data); err != nil {
69 return err
70 }
71
72 d.bitdecoder = utils.NewBitReader(bytes.NewReader(d.data))
73 d.currentBlockVals = 0
74 d.currentMiniBlockVals = 0
75 if d.deltaBitWidths == nil {
76 d.deltaBitWidths = memory.NewResizableBuffer(d.mem)
77 }
78
79 var ok bool
80 d.blockSize, ok = d.bitdecoder.GetVlqInt()
81 if !ok {
82 return errors.New("parquet: eof exception")
83 }
84
85 if d.miniBlocksPerBlock, ok = d.bitdecoder.GetVlqInt(); !ok {
86 return errors.New("parquet: eof exception")
87 }
88 if d.miniBlocksPerBlock == 0 {
89 return errors.New("parquet: cannot have zero miniblock per block")
90 }
91
92 if d.totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
93 return errors.New("parquet: eof exception")
94 }
95
96 if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
97 return errors.New("parquet: eof exception")
98 }
99
100 d.valsPerMini = uint32(d.blockSize / d.miniBlocksPerBlock)
101 d.usedFirst = false
102 return nil
103 }
104
105
106 func (d *deltaBitPackDecoder) initBlock() error {
107
108 var ok bool
109 if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
110 return errors.New("parquet: eof exception")
111 }
112
113
114 d.deltaBitWidths.Resize(int(d.miniBlocksPerBlock))
115
116 var err error
117 for i := uint64(0); i < d.miniBlocksPerBlock; i++ {
118 if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil {
119 return err
120 }
121 }
122
123 d.miniBlockIdx = 0
124 d.deltaBitWidth = d.deltaBitWidths.Bytes()[0]
125 d.currentBlockVals = uint32(d.blockSize)
126 return nil
127 }
128
129
130 type DeltaBitPackInt32Decoder struct {
131 *deltaBitPackDecoder
132
133 miniBlockValues []int32
134 }
135
136 func (d *DeltaBitPackInt32Decoder) unpackNextMini() error {
137 if d.miniBlockValues == nil {
138 d.miniBlockValues = make([]int32, 0, int(d.valsPerMini))
139 } else {
140 d.miniBlockValues = d.miniBlockValues[:0]
141 }
142 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
143 d.currentMiniBlockVals = d.valsPerMini
144
145 for j := 0; j < int(d.valsPerMini); j++ {
146 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
147 if !ok {
148 return errors.New("parquet: eof exception")
149 }
150
151 d.lastVal += int64(delta) + int64(d.minDelta)
152 d.miniBlockValues = append(d.miniBlockValues, int32(d.lastVal))
153 }
154 d.miniBlockIdx++
155 return nil
156 }
157
158
159
160 func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) {
161 max := shared_utils.Min(len(out), int(d.totalValues))
162 if max == 0 {
163 return 0, nil
164 }
165
166 out = out[:max]
167 if !d.usedFirst {
168 out[0] = int32(d.lastVal)
169 out = out[1:]
170 d.usedFirst = true
171 }
172
173 var err error
174 for len(out) > 0 {
175 if d.currentBlockVals == 0 {
176 err = d.initBlock()
177 if err != nil {
178 return 0, err
179 }
180 }
181 if d.currentMiniBlockVals == 0 {
182 err = d.unpackNextMini()
183 }
184 if err != nil {
185 return 0, err
186 }
187
188
189 start := int(d.valsPerMini - d.currentMiniBlockVals)
190 numCopied := copy(out, d.miniBlockValues[start:])
191
192 out = out[numCopied:]
193 d.currentBlockVals -= uint32(numCopied)
194 d.currentMiniBlockVals -= uint32(numCopied)
195 }
196 d.nvals -= max
197 return max, nil
198 }
199
200
201 func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
202 toread := len(out) - nullCount
203 values, err := d.Decode(out[:toread])
204 if err != nil {
205 return values, err
206 }
207 if values != toread {
208 return values, errors.New("parquet: number of values / definition levels read did not match")
209 }
210
211 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
212 }
213
214
215 func (DeltaBitPackInt32Decoder) Type() parquet.Type {
216 return parquet.Types.Int32
217 }
218
219
220 type DeltaBitPackInt64Decoder struct {
221 *deltaBitPackDecoder
222
223 miniBlockValues []int64
224 }
225
226 func (d *DeltaBitPackInt64Decoder) unpackNextMini() error {
227 if d.miniBlockValues == nil {
228 d.miniBlockValues = make([]int64, 0, int(d.valsPerMini))
229 } else {
230 d.miniBlockValues = d.miniBlockValues[:0]
231 }
232
233 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
234 d.currentMiniBlockVals = d.valsPerMini
235
236 for j := 0; j < int(d.valsPerMini); j++ {
237 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
238 if !ok {
239 return errors.New("parquet: eof exception")
240 }
241
242 d.lastVal += int64(delta) + d.minDelta
243 d.miniBlockValues = append(d.miniBlockValues, d.lastVal)
244 }
245 d.miniBlockIdx++
246 return nil
247 }
248
249
250
251 func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) {
252 max := shared_utils.Min(len(out), d.nvals)
253 if max == 0 {
254 return 0, nil
255 }
256
257 out = out[:max]
258 if !d.usedFirst {
259 out[0] = d.lastVal
260 out = out[1:]
261 d.usedFirst = true
262 }
263
264 var err error
265 for len(out) > 0 {
266 if d.currentBlockVals == 0 {
267 err = d.initBlock()
268 if err != nil {
269 return 0, err
270 }
271 }
272 if d.currentMiniBlockVals == 0 {
273 err = d.unpackNextMini()
274 }
275
276 if err != nil {
277 return 0, err
278 }
279
280 start := int(d.valsPerMini - d.currentMiniBlockVals)
281 numCopied := copy(out, d.miniBlockValues[start:])
282
283 out = out[numCopied:]
284 d.currentBlockVals -= uint32(numCopied)
285 d.currentMiniBlockVals -= uint32(numCopied)
286 }
287 d.nvals -= max
288 return max, nil
289 }
290
291
292 func (DeltaBitPackInt64Decoder) Type() parquet.Type {
293 return parquet.Types.Int64
294 }
295
296
297 func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
298 toread := len(out) - nullCount
299 values, err := d.Decode(out[:toread])
300 if err != nil {
301 return values, err
302 }
303 if values != toread {
304 return values, errors.New("parquet: number of values / definition levels read did not match")
305 }
306
307 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
308 }
309
310 const (
311
312 defaultBlockSize = 128
313 defaultNumMiniBlocks = 4
314
315 defaultNumValuesPerMini = 32
316
317 maxHeaderWriterSize = 32
318 )
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336 type deltaBitPackEncoder struct {
337 encoder
338
339 bitWriter *utils.BitWriter
340 totalVals uint64
341 firstVal int64
342 currentVal int64
343
344 blockSize uint64
345 miniBlockSize uint64
346 numMiniBlocks uint64
347 deltas []int64
348 }
349
350
351 func (enc *deltaBitPackEncoder) flushBlock() {
352 if len(enc.deltas) == 0 {
353 return
354 }
355
356
357 minDelta := int64(math.MaxInt64)
358 for _, delta := range enc.deltas {
359 if delta < minDelta {
360 minDelta = delta
361 }
362 }
363
364 enc.bitWriter.WriteZigZagVlqInt(minDelta)
365
366 offset, _ := enc.bitWriter.SkipBytes(int(enc.numMiniBlocks))
367
368 valuesToWrite := int64(len(enc.deltas))
369 for i := 0; i < int(enc.numMiniBlocks); i++ {
370 n := shared_utils.Min(int64(enc.miniBlockSize), valuesToWrite)
371 if n == 0 {
372 break
373 }
374
375 maxDelta := int64(math.MinInt64)
376 start := i * int(enc.miniBlockSize)
377 for _, val := range enc.deltas[start : start+int(n)] {
378 maxDelta = shared_utils.Max(maxDelta, val)
379 }
380
381
382 width := uint(bits.Len64(uint64(maxDelta - minDelta)))
383
384 enc.bitWriter.WriteAt([]byte{byte(width)}, int64(offset+i))
385
386
387 for _, val := range enc.deltas[start : start+int(n)] {
388 enc.bitWriter.WriteValue(uint64(val-minDelta), width)
389 }
390
391 valuesToWrite -= n
392
393
394 for ; n < int64(enc.miniBlockSize); n++ {
395 enc.bitWriter.WriteValue(0, width)
396 }
397 }
398 enc.deltas = enc.deltas[:0]
399 }
400
401
402
403 func (enc *deltaBitPackEncoder) putInternal(data interface{}) {
404 v := reflect.ValueOf(data)
405 if v.Len() == 0 {
406 return
407 }
408
409 idx := 0
410 if enc.totalVals == 0 {
411 enc.blockSize = defaultBlockSize
412 enc.numMiniBlocks = defaultNumMiniBlocks
413 enc.miniBlockSize = defaultNumValuesPerMini
414
415 enc.firstVal = v.Index(0).Int()
416 enc.currentVal = enc.firstVal
417 idx = 1
418
419 enc.bitWriter = utils.NewBitWriter(enc.sink)
420 }
421
422 enc.totalVals += uint64(v.Len())
423 for ; idx < v.Len(); idx++ {
424 val := v.Index(idx).Int()
425 enc.deltas = append(enc.deltas, val-enc.currentVal)
426 enc.currentVal = val
427 if len(enc.deltas) == int(enc.blockSize) {
428 enc.flushBlock()
429 }
430 }
431 }
432
433
434
435 func (enc *deltaBitPackEncoder) FlushValues() (Buffer, error) {
436 if enc.bitWriter != nil {
437
438 enc.flushBlock()
439 enc.bitWriter.Flush(true)
440 } else {
441 enc.blockSize = defaultBlockSize
442 enc.numMiniBlocks = defaultNumMiniBlocks
443 enc.miniBlockSize = defaultNumValuesPerMini
444 }
445
446 buffer := make([]byte, maxHeaderWriterSize)
447 headerWriter := utils.NewBitWriter(utils.NewWriterAtBuffer(buffer))
448
449 headerWriter.WriteVlqInt(uint64(enc.blockSize))
450 headerWriter.WriteVlqInt(uint64(enc.numMiniBlocks))
451 headerWriter.WriteVlqInt(uint64(enc.totalVals))
452 headerWriter.WriteZigZagVlqInt(int64(enc.firstVal))
453 headerWriter.Flush(false)
454
455 buffer = buffer[:headerWriter.Written()]
456 enc.totalVals = 0
457
458 if enc.bitWriter != nil {
459 flushed := enc.sink.Finish()
460 defer flushed.Release()
461
462 buffer = append(buffer, flushed.Buf()[:enc.bitWriter.Written()]...)
463 }
464 return poolBuffer{memory.NewBufferBytes(buffer)}, nil
465 }
466
467
468 func (enc *deltaBitPackEncoder) EstimatedDataEncodedSize() int64 {
469 return int64(enc.bitWriter.Written())
470 }
471
472
473 type DeltaBitPackInt32Encoder struct {
474 *deltaBitPackEncoder
475 }
476
477
478 func (enc DeltaBitPackInt32Encoder) Put(in []int32) {
479 enc.putInternal(in)
480 }
481
482
483
484 func (enc DeltaBitPackInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) {
485 buffer := memory.NewResizableBuffer(enc.mem)
486 buffer.Reserve(arrow.Int32Traits.BytesRequired(len(in)))
487 defer buffer.Release()
488
489 data := arrow.Int32Traits.CastFromBytes(buffer.Buf())
490 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
491 enc.Put(data[:nvalid])
492 }
493
494
495 func (DeltaBitPackInt32Encoder) Type() parquet.Type {
496 return parquet.Types.Int32
497 }
498
499
500 type DeltaBitPackInt64Encoder struct {
501 *deltaBitPackEncoder
502 }
503
504
505 func (enc DeltaBitPackInt64Encoder) Put(in []int64) {
506 enc.putInternal(in)
507 }
508
509
510
511 func (enc DeltaBitPackInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) {
512 buffer := memory.NewResizableBuffer(enc.mem)
513 buffer.Reserve(arrow.Int64Traits.BytesRequired(len(in)))
514 defer buffer.Release()
515
516 data := arrow.Int64Traits.CastFromBytes(buffer.Buf())
517 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
518 enc.Put(data[:nvalid])
519 }
520
521
522 func (DeltaBitPackInt64Encoder) Type() parquet.Type {
523 return parquet.Types.Int64
524 }
525
View as plain text