...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "bytes"
21 "encoding/binary"
22 "errors"
23 "fmt"
24 "math/bits"
25
26 "github.com/JohnCGriffin/overflow"
27 "github.com/apache/arrow/go/v15/arrow/bitutil"
28 shared_utils "github.com/apache/arrow/go/v15/internal/utils"
29 "github.com/apache/arrow/go/v15/parquet"
30 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
31 "github.com/apache/arrow/go/v15/parquet/internal/utils"
32 )
33
34
35
36 type LevelEncoder struct {
37 bitWidth int
38 rleLen int
39 encoding format.Encoding
40 rle *utils.RleEncoder
41 bit *utils.BitWriter
42 }
43
44
45
46 func LevelEncodingMaxBufferSize(encoding parquet.Encoding, maxLvl int16, nbuffered int) int {
47 bitWidth := bits.Len64(uint64(maxLvl))
48 nbytes := 0
49 switch encoding {
50 case parquet.Encodings.RLE:
51 nbytes = utils.MaxRLEBufferSize(bitWidth, nbuffered) + utils.MinRLEBufferSize(bitWidth)
52 case parquet.Encodings.BitPacked:
53 nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth)))
54 default:
55 panic("parquet: unknown encoding type for levels")
56 }
57 return nbytes
58 }
59
60
61
62 func (l *LevelEncoder) Reset(maxLvl int16) {
63 l.bitWidth = bits.Len64(uint64(maxLvl))
64 switch l.encoding {
65 case format.Encoding_RLE:
66 l.rle.Clear()
67 l.rle.BitWidth = l.bitWidth
68 case format.Encoding_BIT_PACKED:
69 l.bit.Clear()
70 default:
71 panic("parquet: unknown encoding type")
72 }
73 }
74
75
76
77 func (l *LevelEncoder) Init(encoding parquet.Encoding, maxLvl int16, w utils.WriterAtWithLen) {
78 l.bitWidth = bits.Len64(uint64(maxLvl))
79 l.encoding = format.Encoding(encoding)
80 switch l.encoding {
81 case format.Encoding_RLE:
82 l.rle = utils.NewRleEncoder(w, l.bitWidth)
83 case format.Encoding_BIT_PACKED:
84 l.bit = utils.NewBitWriter(w)
85 default:
86 panic("parquet: unknown encoding type for levels")
87 }
88 }
89
90
91
92
93
94 func (l *LevelEncoder) EncodeNoFlush(lvls []int16) (nencoded int, err error) {
95 if l.rle == nil && l.bit == nil {
96 panic("parquet: level encoders are not initialized")
97 }
98
99 switch l.encoding {
100 case format.Encoding_RLE:
101 for _, level := range lvls {
102 if err = l.rle.Put(uint64(level)); err != nil {
103 return
104 }
105 nencoded++
106 }
107 default:
108 for _, level := range lvls {
109 if err = l.bit.WriteValue(uint64(level), uint(l.bitWidth)); err != nil {
110 return
111 }
112 nencoded++
113 }
114 }
115 return
116 }
117
118
119 func (l *LevelEncoder) Flush() {
120 if l.rle == nil && l.bit == nil {
121 panic("parquet: level encoders are not initialized")
122 }
123
124 switch l.encoding {
125 case format.Encoding_RLE:
126 l.rleLen = l.rle.Flush()
127 default:
128 l.bit.Flush(false)
129 }
130 }
131
132
133
134
135 func (l *LevelEncoder) Encode(lvls []int16) (nencoded int, err error) {
136 if l.rle == nil && l.bit == nil {
137 panic("parquet: level encoders are not initialized")
138 }
139
140 switch l.encoding {
141 case format.Encoding_RLE:
142 defer func() { l.rleLen = l.rle.Flush() }()
143 for _, level := range lvls {
144 if err = l.rle.Put(uint64(level)); err != nil {
145 return
146 }
147 nencoded++
148 }
149
150 default:
151 defer l.bit.Flush(false)
152 for _, level := range lvls {
153 if err = l.bit.WriteValue(uint64(level), uint(l.bitWidth)); err != nil {
154 return
155 }
156 nencoded++
157 }
158 }
159 return
160 }
161
162
163
164
165 func (l *LevelEncoder) Len() int {
166 if l.encoding != format.Encoding_RLE {
167 panic("parquet: level encoder, only implemented for RLE")
168 }
169 return l.rleLen
170 }
171
172
173
174 type LevelDecoder struct {
175 bitWidth int
176 remaining int
177 maxLvl int16
178 encoding format.Encoding
179
180
181 rle *utils.RleDecoder
182 bit *utils.BitReader
183 }
184
185
186
187
188 func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffered int, data []byte) (int, error) {
189 l.maxLvl = maxLvl
190 l.encoding = format.Encoding(encoding)
191 l.remaining = nbuffered
192 l.bitWidth = bits.Len64(uint64(maxLvl))
193
194 switch encoding {
195 case parquet.Encodings.RLE:
196 if len(data) < 4 {
197 return 0, errors.New("parquet: received invalid levels (corrupt data page?)")
198 }
199
200 nbytes := int32(binary.LittleEndian.Uint32(data[:4]))
201 if nbytes < 0 || nbytes > int32(len(data)-4) {
202 return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)")
203 }
204
205 buf := data[4:]
206 if l.rle == nil {
207 l.rle = utils.NewRleDecoder(bytes.NewReader(buf), l.bitWidth)
208 } else {
209 l.rle.Reset(bytes.NewReader(buf), l.bitWidth)
210 }
211 return int(nbytes) + 4, nil
212 case parquet.Encodings.BitPacked:
213 nbits, ok := overflow.Mul(nbuffered, l.bitWidth)
214 if !ok {
215 return 0, errors.New("parquet: number of buffered values too large (corrupt data page?)")
216 }
217
218 nbytes := bitutil.BytesForBits(int64(nbits))
219 if nbytes < 0 || nbytes > int64(len(data)) {
220 return 0, errors.New("parquet: received invalid number of bytes (corrupt data page?)")
221 }
222 if l.bit == nil {
223 l.bit = utils.NewBitReader(bytes.NewReader(data))
224 } else {
225 l.bit.Reset(bytes.NewReader(data))
226 }
227 return int(nbytes), nil
228 default:
229 return 0, fmt.Errorf("parquet: unknown encoding type for levels '%s'", encoding)
230 }
231 }
232
233
234
235 func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int, data []byte) error {
236 if nbytes < 0 {
237 return errors.New("parquet: invalid page header (corrupt data page?)")
238 }
239
240 l.maxLvl = maxLvl
241 l.encoding = format.Encoding_RLE
242 l.remaining = nbuffered
243 l.bitWidth = bits.Len64(uint64(maxLvl))
244
245 if l.rle == nil {
246 l.rle = utils.NewRleDecoder(bytes.NewReader(data), l.bitWidth)
247 } else {
248 l.rle.Reset(bytes.NewReader(data), l.bitWidth)
249 }
250 return nil
251 }
252
253
254
255
256
257 func (l *LevelDecoder) Decode(levels []int16) (int, int64) {
258 var (
259 buf [1024]uint64
260 totaldecoded int
261 decoded int
262 valsToRead int64
263 )
264
265 n := shared_utils.Min(int64(l.remaining), int64(len(levels)))
266 for n > 0 {
267 batch := shared_utils.Min(1024, n)
268 switch l.encoding {
269 case format.Encoding_RLE:
270 decoded = l.rle.GetBatch(buf[:batch])
271 case format.Encoding_BIT_PACKED:
272 decoded, _ = l.bit.GetBatch(uint(l.bitWidth), buf[:batch])
273 }
274 l.remaining -= decoded
275 totaldecoded += decoded
276 n -= batch
277
278 for idx, val := range buf[:decoded] {
279 lvl := int16(val)
280 levels[idx] = lvl
281 if lvl == l.maxLvl {
282 valsToRead++
283 }
284 }
285 levels = levels[decoded:]
286 }
287
288 return totaldecoded, valsToRead
289 }
290
View as plain text