1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding_test
18
19 import (
20 "encoding/binary"
21 "strconv"
22 "testing"
23
24 "github.com/apache/arrow/go/v15/arrow"
25 "github.com/apache/arrow/go/v15/arrow/memory"
26 "github.com/apache/arrow/go/v15/internal/utils"
27 "github.com/apache/arrow/go/v15/parquet"
28 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
29 "github.com/stretchr/testify/assert"
30 )
31
32 func generateLevels(minRepeat, maxRepeat int, maxLevel int16) []int16 {
33
34 ret := make([]int16, 0)
35 for rep := minRepeat; rep <= maxRepeat; rep++ {
36 var (
37 repCount = 1 << rep
38 val int16 = 0
39 bwidth = 0
40 )
41
42 for val <= maxLevel {
43 for i := 0; i < repCount; i++ {
44 ret = append(ret, val)
45 }
46 val = int16((2 << bwidth) - 1)
47 bwidth++
48 }
49 }
50 return ret
51 }
52
53 func encodeLevels(t *testing.T, enc parquet.Encoding, maxLvl int16, numLevels int, input []int16) []byte {
54 var (
55 encoder encoding.LevelEncoder
56 lvlCount = 0
57 buf = encoding.NewBufferWriter(2*numLevels, memory.DefaultAllocator)
58 )
59
60 if enc == parquet.Encodings.RLE {
61 buf.SetOffset(arrow.Int32SizeBytes)
62
63 encoder.Init(enc, maxLvl, buf)
64 lvlCount, _ = encoder.Encode(input)
65 buf.SetOffset(0)
66 arrow.Int32Traits.CastFromBytes(buf.Bytes())[0] = utils.ToLEInt32(int32(encoder.Len()))
67 } else {
68 encoder.Init(enc, maxLvl, buf)
69 lvlCount, _ = encoder.Encode(input)
70 }
71
72 assert.Equal(t, numLevels, lvlCount)
73 return buf.Bytes()
74 }
75
76 func verifyDecodingLvls(t *testing.T, enc parquet.Encoding, maxLvl int16, input []int16, buf []byte) {
77 var (
78 decoder encoding.LevelDecoder
79 lvlCount = 0
80 numLevels = len(input)
81 output = make([]int16, numLevels)
82 decodeCount = 4
83 numInnerLevels = numLevels / decodeCount
84 )
85
86
87 _, err := decoder.SetData(enc, maxLvl, numLevels, buf)
88 assert.NoError(t, err)
89
90 for ct := 0; ct < decodeCount; ct++ {
91 offset := ct * numInnerLevels
92 lvlCount, _ = decoder.Decode(output[:numInnerLevels])
93 assert.Equal(t, numInnerLevels, lvlCount)
94 assert.Equal(t, input[offset:offset+numInnerLevels], output[:numInnerLevels])
95 }
96
97
98 var (
99 levelsCompleted = decodeCount * (numLevels / decodeCount)
100 remaining = numLevels - levelsCompleted
101 )
102
103 if remaining > 0 {
104 lvlCount, _ = decoder.Decode(output[:remaining])
105 assert.Equal(t, remaining, lvlCount)
106 assert.Equal(t, input[levelsCompleted:], output[:remaining])
107 }
108
109 lvlCount, _ = decoder.Decode(output[:1])
110 assert.Zero(t, lvlCount)
111 }
112
113 func verifyDecodingMultipleSetData(t *testing.T, enc parquet.Encoding, max int16, input []int16, buf [][]byte) {
114 var (
115 decoder encoding.LevelDecoder
116 lvlCount = 0
117 setdataCount = len(buf)
118 numLevels = len(input) / setdataCount
119 output = make([]int16, numLevels)
120 )
121
122 for ct := 0; ct < setdataCount; ct++ {
123 offset := ct * numLevels
124 assert.Len(t, output, numLevels)
125 _, err := decoder.SetData(enc, max, numLevels, buf[ct])
126 assert.NoError(t, err)
127 lvlCount, _ = decoder.Decode(output)
128 assert.Equal(t, numLevels, lvlCount)
129 assert.Equal(t, input[offset:offset+numLevels], output)
130 }
131 }
132
133 func TestLevelsDecodeMultipleBitWidth(t *testing.T) {
134 t.Parallel()
135
136
137 var (
138 minRepeat = 0
139 maxRepeat = 7
140 maxBitWidth = 8
141 input []int16
142 buf []byte
143 encodings = [2]parquet.Encoding{parquet.Encodings.RLE, parquet.Encodings.BitPacked}
144 )
145
146 for _, enc := range encodings {
147 t.Run(enc.String(), func(t *testing.T) {
148
149 if enc == parquet.Encodings.BitPacked {
150 minRepeat = 3
151 }
152
153 for bitWidth := 1; bitWidth <= maxBitWidth; bitWidth++ {
154 t.Run(strconv.Itoa(bitWidth), func(t *testing.T) {
155 max := int16((1 << bitWidth) - 1)
156
157 input = generateLevels(minRepeat, maxRepeat, max)
158 assert.NotPanics(t, func() {
159 buf = encodeLevels(t, enc, max, len(input), input)
160 })
161 assert.NotPanics(t, func() {
162 verifyDecodingLvls(t, enc, max, input, buf)
163 })
164 })
165 }
166 })
167 }
168 }
169
170 func TestLevelsDecodeMultipleSetData(t *testing.T) {
171 t.Parallel()
172
173 var (
174 minRepeat = 3
175 maxRepeat = 7
176 bitWidth = 8
177 maxLevel = int16((1 << bitWidth) - 1)
178 encodings = [2]parquet.Encoding{parquet.Encodings.RLE, parquet.Encodings.BitPacked}
179 )
180
181 input := generateLevels(minRepeat, maxRepeat, maxLevel)
182
183 var (
184 numLevels = len(input)
185 setdataFactor = 8
186 splitLevelSize = numLevels / setdataFactor
187 buf = make([][]byte, setdataFactor)
188 )
189
190 for _, enc := range encodings {
191 t.Run(enc.String(), func(t *testing.T) {
192 for rf := 0; rf < setdataFactor; rf++ {
193 offset := rf * splitLevelSize
194 assert.NotPanics(t, func() {
195 buf[rf] = encodeLevels(t, enc, maxLevel, splitLevelSize, input[offset:offset+splitLevelSize])
196 })
197 }
198 assert.NotPanics(t, func() {
199 verifyDecodingMultipleSetData(t, enc, maxLevel, input, buf)
200 })
201 })
202 }
203 }
204
205 func TestMinimumBufferSize(t *testing.T) {
206 t.Parallel()
207
208 const numToEncode = 1024
209 levels := make([]int16, numToEncode)
210
211 for idx := range levels {
212 if idx%9 == 0 {
213 levels[idx] = 0
214 } else {
215 levels[idx] = 1
216 }
217 }
218
219 output := encoding.NewBufferWriter(0, memory.DefaultAllocator)
220
221 var encoder encoding.LevelEncoder
222 encoder.Init(parquet.Encodings.RLE, 1, output)
223 count, _ := encoder.Encode(levels)
224 assert.Equal(t, numToEncode, count)
225 }
226
227 func TestMinimumBufferSize2(t *testing.T) {
228 t.Parallel()
229
230
231
232
233
234
235 const numToEncode = 1024
236 levels := make([]int16, numToEncode)
237
238 for idx := range levels {
239
240
241 if (idx % 16) < 7 {
242 levels[idx] = 0
243 } else {
244 levels[idx] = 1
245 }
246 }
247
248 for bitWidth := int16(1); bitWidth <= 8; bitWidth++ {
249 output := encoding.NewBufferWriter(0, memory.DefaultAllocator)
250
251 var encoder encoding.LevelEncoder
252 encoder.Init(parquet.Encodings.RLE, bitWidth, output)
253 count, _ := encoder.Encode(levels)
254 assert.Equal(t, numToEncode, count)
255 }
256 }
257
258 func TestEncodeDecodeLevels(t *testing.T) {
259 t.Parallel()
260 const numToEncode = 2048
261 levels := make([]int16, numToEncode)
262 numones := 0
263 for idx := range levels {
264 if (idx % 16) < 7 {
265 levels[idx] = 0
266 } else {
267 levels[idx] = 1
268 numones++
269 }
270 }
271
272 output := encoding.NewBufferWriter(0, memory.DefaultAllocator)
273
274 var encoder encoding.LevelEncoder
275 encoder.Init(parquet.Encodings.RLE, 1, output)
276 count, _ := encoder.Encode(levels)
277 assert.Equal(t, numToEncode, count)
278 encoder.Flush()
279
280 buf := output.Bytes()
281 var prefix [4]byte
282 binary.LittleEndian.PutUint32(prefix[:], uint32(len(buf)))
283
284 var decoder encoding.LevelDecoder
285 _, err := decoder.SetData(parquet.Encodings.RLE, 1, numToEncode, append(prefix[:], buf...))
286 assert.NoError(t, err)
287
288 var levelOut [numToEncode]int16
289 total, vals := decoder.Decode(levelOut[:])
290 assert.EqualValues(t, numToEncode, total)
291 assert.EqualValues(t, numones, vals)
292 assert.Equal(t, levels, levelOut[:])
293 }
294
View as plain text