...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "encoding/binary"
21
22 "github.com/apache/arrow/go/v15/arrow/bitutil"
23 "github.com/apache/arrow/go/v15/parquet"
24 "github.com/apache/arrow/go/v15/parquet/internal/debug"
25 "github.com/apache/arrow/go/v15/parquet/internal/utils"
26 )
27
28 const (
29 boolBufSize = 1024
30 boolsInBuf = boolBufSize * 8
31 )
32
33
34 type PlainBooleanEncoder struct {
35 encoder
36 bitsBuffer []byte
37 wr utils.BitmapWriter
38 }
39
40
41 func (PlainBooleanEncoder) Type() parquet.Type {
42 return parquet.Types.Boolean
43 }
44
45
46 func (enc *PlainBooleanEncoder) Put(in []bool) {
47 if enc.bitsBuffer == nil {
48 enc.bitsBuffer = make([]byte, boolBufSize)
49 }
50 if enc.wr == nil {
51 enc.wr = utils.NewBitmapWriter(enc.bitsBuffer, 0, boolsInBuf)
52 }
53 if len(in) == 0 {
54 return
55 }
56
57 n := enc.wr.AppendBools(in)
58 for n < len(in) {
59 enc.wr.Finish()
60 enc.append(enc.bitsBuffer)
61 enc.wr.Reset(0, boolsInBuf)
62 in = in[n:]
63 n = enc.wr.AppendBools(in)
64 }
65 }
66
67
68
69 func (enc *PlainBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) {
70 bufferOut := make([]bool, len(in))
71 nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
72 enc.Put(bufferOut[:nvalid])
73 }
74
75
76
77 func (enc *PlainBooleanEncoder) EstimatedDataEncodedSize() int64 {
78 return int64(enc.sink.Len() + int(bitutil.BytesForBits(int64(enc.wr.Pos()))))
79 }
80
81
82
83 func (enc *PlainBooleanEncoder) FlushValues() (Buffer, error) {
84 if enc.wr.Pos() > 0 {
85 toFlush := int(enc.wr.Pos())
86 enc.append(enc.bitsBuffer[:bitutil.BytesForBits(int64(toFlush))])
87 }
88
89 enc.wr.Reset(0, boolsInBuf)
90
91 return enc.sink.Finish(), nil
92 }
93
94 const rleLengthInBytes = 4
95
96 type RleBooleanEncoder struct {
97 encoder
98
99 bufferedValues []bool
100 }
101
102 func (RleBooleanEncoder) Type() parquet.Type {
103 return parquet.Types.Boolean
104 }
105
106 func (enc *RleBooleanEncoder) Put(in []bool) {
107 enc.bufferedValues = append(enc.bufferedValues, in...)
108 }
109
110 func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) {
111 bufferOut := make([]bool, len(in))
112 nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
113 enc.Put(bufferOut[:nvalid])
114 }
115
116 func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
117 return rleLengthInBytes + int64(enc.maxRleBufferSize())
118 }
119
120 func (enc *RleBooleanEncoder) maxRleBufferSize() int {
121 return utils.MaxRLEBufferSize(1, len(enc.bufferedValues)) +
122 utils.MinRLEBufferSize(1)
123 }
124
125 func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
126 rleBufferSizeMax := enc.maxRleBufferSize()
127 enc.sink.SetOffset(rleLengthInBytes)
128 enc.sink.Reserve(rleBufferSizeMax)
129
130 rleEncoder := utils.NewRleEncoder(enc.sink, 1)
131 for _, v := range enc.bufferedValues {
132 if v {
133 rleEncoder.Put(1)
134 } else {
135 rleEncoder.Put(0)
136 }
137 }
138 n := rleEncoder.Flush()
139 debug.Assert(n <= rleBufferSizeMax, "num encoded bytes larger than expected max")
140 buf := enc.sink.Finish()
141 binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n))
142
143 return buf, nil
144 }
145
View as plain text