1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "github.com/apache/arrow/go/v15/arrow/memory"
21 "github.com/apache/arrow/go/v15/internal/utils"
22 "github.com/apache/arrow/go/v15/parquet"
23 "golang.org/x/xerrors"
24 )
25
26
27
28
29
30
31
32
33 type DeltaByteArrayEncoder struct {
34 encoder
35
36 prefixEncoder *DeltaBitPackInt32Encoder
37 suffixEncoder *DeltaLengthByteArrayEncoder
38
39 lastVal parquet.ByteArray
40 }
41
42 func (enc *DeltaByteArrayEncoder) EstimatedDataEncodedSize() int64 {
43 return enc.prefixEncoder.EstimatedDataEncodedSize() + enc.suffixEncoder.EstimatedDataEncodedSize()
44 }
45
46 func (enc *DeltaByteArrayEncoder) initEncoders() {
47 enc.prefixEncoder = &DeltaBitPackInt32Encoder{
48 deltaBitPackEncoder: &deltaBitPackEncoder{encoder: newEncoderBase(enc.encoding, nil, enc.mem)}}
49 enc.suffixEncoder = &DeltaLengthByteArrayEncoder{
50 newEncoderBase(enc.encoding, nil, enc.mem),
51 &DeltaBitPackInt32Encoder{
52 deltaBitPackEncoder: &deltaBitPackEncoder{encoder: newEncoderBase(enc.encoding, nil, enc.mem)}}}
53 }
54
55
56 func (DeltaByteArrayEncoder) Type() parquet.Type { return parquet.Types.ByteArray }
57
58
59 func (enc *DeltaByteArrayEncoder) Put(in []parquet.ByteArray) {
60 if len(in) == 0 {
61 return
62 }
63
64 var suf parquet.ByteArray
65 if enc.prefixEncoder == nil {
66 enc.initEncoders()
67 enc.prefixEncoder.Put([]int32{0})
68 suf = in[0]
69 enc.lastVal = in[0]
70 enc.suffixEncoder.Put([]parquet.ByteArray{suf})
71 in = in[1:]
72 }
73
74
75
76 for _, val := range in {
77 l1 := enc.lastVal.Len()
78 l2 := val.Len()
79 j := 0
80 for j < l1 && j < l2 {
81 if enc.lastVal[j] != val[j] {
82 break
83 }
84 j++
85 }
86 enc.prefixEncoder.Put([]int32{int32(j)})
87 suf = val[j:]
88 enc.suffixEncoder.Put([]parquet.ByteArray{suf})
89 enc.lastVal = val
90 }
91
92
93
94
95
96 enc.lastVal = append([]byte{}, enc.lastVal...)
97 }
98
99
100
101 func (enc *DeltaByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
102 if validBits != nil {
103 data := make([]parquet.ByteArray, len(in))
104 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
105 enc.Put(data[:nvalid])
106 } else {
107 enc.Put(in)
108 }
109 }
110
111
112
113 func (enc *DeltaByteArrayEncoder) FlushValues() (Buffer, error) {
114 if enc.prefixEncoder == nil {
115 enc.initEncoders()
116 }
117 prefixBuf, err := enc.prefixEncoder.FlushValues()
118 if err != nil {
119 return nil, err
120 }
121 defer prefixBuf.Release()
122
123 suffixBuf, err := enc.suffixEncoder.FlushValues()
124 if err != nil {
125 return nil, err
126 }
127 defer suffixBuf.Release()
128
129 ret := bufferPool.Get().(*memory.Buffer)
130 ret.ResizeNoShrink(prefixBuf.Len() + suffixBuf.Len())
131 copy(ret.Bytes(), prefixBuf.Bytes())
132 copy(ret.Bytes()[prefixBuf.Len():], suffixBuf.Bytes())
133 return poolBuffer{ret}, nil
134 }
135
136
137 type DeltaByteArrayDecoder struct {
138 *DeltaLengthByteArrayDecoder
139
140 prefixLengths []int32
141 lastVal parquet.ByteArray
142 }
143
144
145 func (DeltaByteArrayDecoder) Type() parquet.Type {
146 return parquet.Types.ByteArray
147 }
148
149 func (d *DeltaByteArrayDecoder) Allocator() memory.Allocator { return d.mem }
150
151
152
153 func (d *DeltaByteArrayDecoder) SetData(nvalues int, data []byte) error {
154 prefixLenDec := DeltaBitPackInt32Decoder{
155 deltaBitPackDecoder: &deltaBitPackDecoder{
156 decoder: newDecoderBase(d.encoding, d.descr),
157 mem: d.mem}}
158
159 if err := prefixLenDec.SetData(nvalues, data); err != nil {
160 return err
161 }
162
163 d.prefixLengths = make([]int32, nvalues)
164
165
166 prefixLenDec.Decode(d.prefixLengths)
167
168
169
170 return d.DeltaLengthByteArrayDecoder.SetData(nvalues, data[int(prefixLenDec.bytesRead()):])
171 }
172
173
174 func (d *DeltaByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
175 max := utils.Min(len(out), d.nvals)
176 if max == 0 {
177 return 0, nil
178 }
179 out = out[:max]
180
181 var err error
182 if d.lastVal == nil {
183 _, err = d.DeltaLengthByteArrayDecoder.Decode(out[:1])
184 if err != nil {
185 return 0, err
186 }
187 d.lastVal = out[0]
188 out = out[1:]
189 d.prefixLengths = d.prefixLengths[1:]
190 }
191
192 var prefixLen int32
193 suffixHolder := make([]parquet.ByteArray, 1)
194 for len(out) > 0 {
195 prefixLen, d.prefixLengths = d.prefixLengths[0], d.prefixLengths[1:]
196
197 prefix := d.lastVal[:prefixLen:prefixLen]
198 _, err = d.DeltaLengthByteArrayDecoder.Decode(suffixHolder)
199 if err != nil {
200 return 0, err
201 }
202
203 if len(suffixHolder[0]) == 0 {
204 d.lastVal = prefix
205 } else {
206 d.lastVal = make([]byte, int(prefixLen)+len(suffixHolder[0]))
207 copy(d.lastVal, prefix)
208 copy(d.lastVal[prefixLen:], suffixHolder[0])
209 }
210 out[0], out = d.lastVal, out[1:]
211 }
212 return max, nil
213 }
214
215
216 func (d *DeltaByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
217 toread := len(out) - nullCount
218 values, err := d.Decode(out[:toread])
219 if err != nil {
220 return values, err
221 }
222 if values != toread {
223 return values, xerrors.New("parquet: number of values / definition levels read did not match")
224 }
225
226 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
227 }
228
View as plain text