1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "bytes"
21 "reflect"
22
23 "github.com/apache/arrow/go/v15/arrow/array"
24 "github.com/apache/arrow/go/v15/arrow/bitutil"
25 "github.com/apache/arrow/go/v15/arrow/memory"
26 "github.com/apache/arrow/go/v15/internal/bitutils"
27 shared_utils "github.com/apache/arrow/go/v15/internal/utils"
28 "github.com/apache/arrow/go/v15/parquet"
29 "github.com/apache/arrow/go/v15/parquet/internal/debug"
30 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
31 "github.com/apache/arrow/go/v15/parquet/internal/utils"
32 "github.com/apache/arrow/go/v15/parquet/schema"
33 "golang.org/x/xerrors"
34 )
35
36
37
38 type DecoderTraits interface {
39 Decoder(e parquet.Encoding, descr *schema.Column, useDict bool, mem memory.Allocator) TypedDecoder
40 BytesRequired(int) int
41 }
42
43
44 func NewDecoder(t parquet.Type, e parquet.Encoding, descr *schema.Column, mem memory.Allocator) TypedDecoder {
45 traits := getDecodingTraits(t)
46 if traits == nil {
47 return nil
48 }
49
50 return traits.Decoder(e, descr, false , mem)
51 }
52
53
54
55
56 func NewDictDecoder(t parquet.Type, descr *schema.Column, mem memory.Allocator) DictDecoder {
57 traits := getDecodingTraits(t)
58 if traits == nil {
59 return nil
60 }
61
62 if mem == nil {
63 mem = memory.DefaultAllocator
64 }
65
66 return traits.Decoder(parquet.Encodings.RLEDict, descr, true , mem).(DictDecoder)
67 }
68
69 type decoder struct {
70 descr *schema.Column
71 encoding format.Encoding
72 nvals int
73 data []byte
74 typeLen int
75 }
76
77
78
79 func newDecoderBase(e format.Encoding, descr *schema.Column) decoder {
80 typeLen := -1
81 if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray {
82 typeLen = int(descr.TypeLength())
83 }
84
85 return decoder{
86 descr: descr,
87 encoding: e,
88 typeLen: typeLen,
89 }
90 }
91
92
93
94 func (d *decoder) SetData(nvals int, data []byte) error {
95 d.data = data
96 d.nvals = nvals
97 return nil
98 }
99
100
101 func (d *decoder) ValuesLeft() int { return d.nvals }
102
103
104 func (d *decoder) Encoding() parquet.Encoding { return parquet.Encoding(d.encoding) }
105
106 type dictDecoder struct {
107 decoder
108 mem memory.Allocator
109 dictValueDecoder utils.DictionaryConverter
110 idxDecoder *utils.RleDecoder
111
112 idxScratchSpace []uint64
113 }
114
115
116
117 func (d *dictDecoder) SetDict(dict TypedDecoder) {
118 if dict.Type() != d.descr.PhysicalType() {
119 panic("parquet: mismatch dictionary and column data type")
120 }
121
122 d.dictValueDecoder = NewDictConverter(dict)
123 }
124
125
126 func (d *dictDecoder) SetData(nvals int, data []byte) error {
127 d.nvals = nvals
128 if len(data) == 0 {
129
130 d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 0 )
131 return nil
132 }
133
134
135 width := uint8(data[0])
136 if width >= 64 {
137 return xerrors.New("parquet: invalid or corrupted bit width")
138 }
139
140
141 d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]), int(width))
142 return nil
143 }
144
145 func (d *dictDecoder) decode(out interface{}) (int, error) {
146 n, err := d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out)
147 d.nvals -= n
148 return n, err
149 }
150
151 func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
152 n, err := d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out, nullCount, validBits, validBitsOffset)
153 d.nvals -= n
154 return n, err
155 }
156
157 func (d *dictDecoder) DecodeIndices(numValues int, bldr array.Builder) (int, error) {
158 n := shared_utils.Min(numValues, d.nvals)
159 if cap(d.idxScratchSpace) < n {
160 d.idxScratchSpace = make([]uint64, n, bitutil.NextPowerOf2(n))
161 } else {
162 d.idxScratchSpace = d.idxScratchSpace[:n]
163 }
164
165 n = d.idxDecoder.GetBatch(d.idxScratchSpace)
166
167 toAppend := make([]int, n)
168 for i, v := range d.idxScratchSpace {
169 toAppend[i] = int(v)
170 }
171 bldr.(*array.BinaryDictionaryBuilder).AppendIndices(toAppend, nil)
172 d.nvals -= n
173 return n, nil
174 }
175
176 func (d *dictDecoder) DecodeIndicesSpaced(numValues, nullCount int, validBits []byte, offset int64, bldr array.Builder) (int, error) {
177 if cap(d.idxScratchSpace) < numValues {
178 d.idxScratchSpace = make([]uint64, numValues, bitutil.NextPowerOf2(numValues))
179 } else {
180 d.idxScratchSpace = d.idxScratchSpace[:numValues]
181 }
182
183 n, err := d.idxDecoder.GetBatchSpaced(d.idxScratchSpace, nullCount, validBits, offset)
184 if err != nil {
185 return n, err
186 }
187
188 valid := make([]bool, n)
189 bitutils.VisitBitBlocks(validBits, offset, int64(n),
190 func(pos int64) { valid[pos] = true }, func() {})
191
192 toAppend := make([]int, n)
193 for i, v := range d.idxScratchSpace {
194 toAppend[i] = int(v)
195 }
196 bldr.(*array.BinaryDictionaryBuilder).AppendIndices(toAppend, valid)
197 d.nvals -= n - nullCount
198 return n, nil
199 }
200
201
202
203
204 func spacedExpand(buffer interface{}, nullCount int, validBits []byte, validBitsOffset int64) int {
205 bufferRef := reflect.ValueOf(buffer)
206 if bufferRef.Kind() != reflect.Slice {
207 panic("invalid spacedexpand type, not slice")
208 }
209
210 var (
211 numValues int = bufferRef.Len()
212 )
213
214 idxDecode := int64(numValues - nullCount)
215 if idxDecode == 0 {
216 return numValues
217 }
218
219
220 rdr := bitutils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(numValues))
221 for {
222 run := rdr.NextRun()
223 if run.Length == 0 {
224 break
225 }
226
227
228
229
230
231
232 idxDecode -= run.Length
233 n := reflect.Copy(bufferRef.Slice(int(run.Pos), bufferRef.Len()), bufferRef.Slice(int(idxDecode), int(int64(idxDecode)+run.Length)))
234 debug.Assert(n == int(run.Length), "reflect.Copy copied incorrect number of elements in spacedExpand")
235 }
236
237 return numValues
238 }
239
View as plain text