1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package testutils
18
19 import (
20 "encoding/binary"
21 "fmt"
22 "io"
23 "reflect"
24
25 "github.com/apache/arrow/go/v15/arrow/memory"
26 "github.com/apache/arrow/go/v15/internal/utils"
27 "github.com/apache/arrow/go/v15/parquet"
28 "github.com/apache/arrow/go/v15/parquet/compress"
29 "github.com/apache/arrow/go/v15/parquet/file"
30 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
31 "github.com/apache/arrow/go/v15/parquet/schema"
32 "github.com/stretchr/testify/mock"
33 )
34
35 type DataPageBuilder struct {
36 sink io.Writer
37 version parquet.DataPageVersion
38
39 nvals int
40 encoding parquet.Encoding
41 defLvlEncoding parquet.Encoding
42 repLvlEncoding parquet.Encoding
43 defLvlBytesLen int
44 repLvlBytesLen int
45 hasDefLvls bool
46 hasRepLvls bool
47 hasValues bool
48 }
49
50 var mem = memory.NewGoAllocator()
51
52 func (d *DataPageBuilder) appendLevels(lvls []int16, maxLvl int16, e parquet.Encoding) int {
53 if e != parquet.Encodings.RLE {
54 panic("parquet: only rle encoding currently implemented")
55 }
56
57 buf := encoding.NewBufferWriter(encoding.LevelEncodingMaxBufferSize(e, maxLvl, len(lvls)), memory.DefaultAllocator)
58 var enc encoding.LevelEncoder
59 enc.Init(e, maxLvl, buf)
60 enc.Encode(lvls)
61
62 rleBytes := enc.Len()
63 if d.version == parquet.DataPageV1 {
64 if err := binary.Write(d.sink, binary.LittleEndian, int32(rleBytes)); err != nil {
65 panic(err)
66 }
67 }
68
69 if _, err := d.sink.Write(buf.Bytes()[:rleBytes]); err != nil {
70 panic(err)
71 }
72 return rleBytes
73 }
74
75 func (d *DataPageBuilder) AppendDefLevels(lvls []int16, maxLvl int16) {
76 d.defLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE)
77
78 d.nvals = utils.Max(len(lvls), d.nvals)
79 d.defLvlEncoding = parquet.Encodings.RLE
80 d.hasDefLvls = true
81 }
82
83 func (d *DataPageBuilder) AppendRepLevels(lvls []int16, maxLvl int16) {
84 d.repLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE)
85
86 d.nvals = utils.Max(len(lvls), d.nvals)
87 d.repLvlEncoding = parquet.Encodings.RLE
88 d.hasRepLvls = true
89 }
90
91 func (d *DataPageBuilder) AppendValues(desc *schema.Column, values interface{}, e parquet.Encoding) {
92 enc := encoding.NewEncoder(desc.PhysicalType(), e, false, desc, mem)
93 var sz int
94 switch v := values.(type) {
95 case []bool:
96 enc.(encoding.BooleanEncoder).Put(v)
97 sz = len(v)
98 case []int32:
99 enc.(encoding.Int32Encoder).Put(v)
100 sz = len(v)
101 case []int64:
102 enc.(encoding.Int64Encoder).Put(v)
103 sz = len(v)
104 case []parquet.Int96:
105 enc.(encoding.Int96Encoder).Put(v)
106 sz = len(v)
107 case []float32:
108 enc.(encoding.Float32Encoder).Put(v)
109 sz = len(v)
110 case []float64:
111 enc.(encoding.Float64Encoder).Put(v)
112 sz = len(v)
113 case []parquet.ByteArray:
114 enc.(encoding.ByteArrayEncoder).Put(v)
115 sz = len(v)
116 default:
117 panic(fmt.Sprintf("no testutil data page builder for type %T", values))
118 }
119 buf, _ := enc.FlushValues()
120 _, err := d.sink.Write(buf.Bytes())
121 if err != nil {
122 panic(err)
123 }
124
125 d.nvals = utils.Max(sz, d.nvals)
126 d.encoding = e
127 d.hasValues = true
128 }
129
130 type DictionaryPageBuilder struct {
131 traits encoding.DictEncoder
132 numDictValues int32
133 hasValues bool
134 }
135
136 func NewDictionaryPageBuilder(d *schema.Column) *DictionaryPageBuilder {
137 return &DictionaryPageBuilder{
138 encoding.NewEncoder(d.PhysicalType(), parquet.Encodings.Plain, true, d, mem).(encoding.DictEncoder),
139 0, false}
140 }
141
142 func (d *DictionaryPageBuilder) AppendValues(values interface{}) encoding.Buffer {
143 switch v := values.(type) {
144 case []int32:
145 d.traits.(encoding.Int32Encoder).Put(v)
146 case []int64:
147 d.traits.(encoding.Int64Encoder).Put(v)
148 case []parquet.Int96:
149 d.traits.(encoding.Int96Encoder).Put(v)
150 case []float32:
151 d.traits.(encoding.Float32Encoder).Put(v)
152 case []float64:
153 d.traits.(encoding.Float64Encoder).Put(v)
154 case []parquet.ByteArray:
155 d.traits.(encoding.ByteArrayEncoder).Put(v)
156 default:
157 panic(fmt.Sprintf("no testutil dictionary page builder for type %T", values))
158 }
159
160 d.numDictValues = int32(d.traits.NumEntries())
161 d.hasValues = true
162 buf, _ := d.traits.FlushValues()
163 return buf
164 }
165
166 func (d *DictionaryPageBuilder) WriteDict() *memory.Buffer {
167 buf := memory.NewBufferBytes(make([]byte, d.traits.DictEncodedSize()))
168 d.traits.WriteDict(buf.Bytes())
169 return buf
170 }
171
172 func (d *DictionaryPageBuilder) NumValues() int32 {
173 return d.numDictValues
174 }
175
176 func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column, values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer, defLvls, repLvls []int16, maxDef, maxRep int16) file.Page {
177 num := 0
178
179 stream := encoding.NewBufferWriter(1024, mem)
180 builder := DataPageBuilder{sink: stream, version: dataPageVersion}
181
182 if len(repLvls) > 0 {
183 builder.AppendRepLevels(repLvls, maxRep)
184 }
185 if len(defLvls) > 0 {
186 builder.AppendDefLevels(defLvls, maxDef)
187 }
188
189 if e == parquet.Encodings.Plain {
190 builder.AppendValues(d, values, e)
191 num = builder.nvals
192 } else {
193 stream.Write(indexBuffer.Bytes())
194 num = utils.Max(builder.nvals, nvals)
195 }
196
197 buf := stream.Finish()
198 if dataPageVersion == parquet.DataPageV1 {
199 return file.NewDataPageV1(buf, int32(num), e, builder.defLvlEncoding, builder.repLvlEncoding, int32(buf.Len()))
200 }
201 return file.NewDataPageV2(buf, int32(num), 0, int32(num), e, int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), int32(buf.Len()), false)
202 }
203
204 func MakeDictPage(d *schema.Column, values interface{}, valuesPerPage []int, e parquet.Encoding) (*file.DictionaryPage, []encoding.Buffer) {
205 bldr := NewDictionaryPageBuilder(d)
206 npages := len(valuesPerPage)
207
208 ref := reflect.ValueOf(values)
209 valStart := 0
210
211 rleIndices := make([]encoding.Buffer, 0, npages)
212 for _, nvals := range valuesPerPage {
213 rleIndices = append(rleIndices, bldr.AppendValues(ref.Slice(valStart, valStart+nvals).Interface()))
214 valStart += nvals
215 }
216
217 buffer := bldr.WriteDict()
218 return file.NewDictionaryPage(buffer, bldr.NumValues(), parquet.Encodings.Plain), rleIndices
219 }
220
221 type MockPageReader struct {
222 mock.Mock
223
224 curpage int
225 }
226
227 func (m *MockPageReader) Err() error {
228 return m.Called().Error(0)
229 }
230
231 func (m *MockPageReader) Reset(parquet.BufferedReader, int64, compress.Compression, *file.CryptoContext) {
232 }
233
234 func (m *MockPageReader) SetMaxPageHeaderSize(int) {}
235
236 func (m *MockPageReader) Page() file.Page {
237 return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1]
238 }
239
240 func (m *MockPageReader) Next() bool {
241 pageList := m.TestData().Get("pages").Data().([]file.Page)
242 m.curpage++
243 return len(pageList) >= m.curpage
244 }
245
246 func PaginatePlain(version parquet.DataPageVersion, d *schema.Column, values reflect.Value, defLevels, repLevels []int16,
247 maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding) []file.Page {
248
249 var (
250 npages = len(valuesPerPage)
251 defLvlStart = 0
252 defLvlEnd = 0
253 repLvlStart = 0
254 repLvlEnd = 0
255 valueStart = 0
256 )
257
258 pageList := make([]file.Page, 0, npages)
259 for i := 0; i < npages; i++ {
260 if maxDef > 0 {
261 defLvlStart = i * lvlsPerPage
262 defLvlEnd = (i + 1) * lvlsPerPage
263 }
264 if maxRep > 0 {
265 repLvlStart = i * lvlsPerPage
266 repLvlEnd = (i + 1) * lvlsPerPage
267 }
268
269 page := MakeDataPage(version, d,
270 values.Slice(valueStart, valueStart+valuesPerPage[i]).Interface(),
271 valuesPerPage[i], enc, nil, defLevels[defLvlStart:defLvlEnd],
272 repLevels[repLvlStart:repLvlEnd], maxDef, maxRep)
273 valueStart += valuesPerPage[i]
274 pageList = append(pageList, page)
275 }
276 return pageList
277 }
278
279 func PaginateDict(version parquet.DataPageVersion, d *schema.Column, values reflect.Value, defLevels, repLevels []int16, maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding) []file.Page {
280 var (
281 npages = len(valuesPerPage)
282 pages = make([]file.Page, 0, npages)
283 defStart = 0
284 defEnd = 0
285 repStart = 0
286 repEnd = 0
287 )
288
289 dictPage, rleIndices := MakeDictPage(d, values.Interface(), valuesPerPage, enc)
290 pages = append(pages, dictPage)
291 for i := 0; i < npages; i++ {
292 if maxDef > 0 {
293 defStart = i * lvlsPerPage
294 defEnd = (i + 1) * lvlsPerPage
295 }
296 if maxRep > 0 {
297 repStart = i * lvlsPerPage
298 repEnd = (i + 1) * lvlsPerPage
299 }
300 page := MakeDataPage(version, d, nil, valuesPerPage[i], enc, rleIndices[i],
301 defLevels[defStart:defEnd], repLevels[repStart:repEnd], maxDef, maxRep)
302 pages = append(pages, page)
303 }
304 return pages
305 }
306
View as plain text