...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "fmt"
21 "math"
22 "math/bits"
23 "unsafe"
24
25 shared_utils "github.com/apache/arrow/go/v15/internal/utils"
26 "github.com/apache/arrow/go/v15/parquet"
27 "github.com/apache/arrow/go/v15/parquet/internal/bmi"
28 "github.com/apache/arrow/go/v15/parquet/internal/utils"
29 "github.com/apache/arrow/go/v15/parquet/schema"
30 "golang.org/x/xerrors"
31 )
32
33 type LevelInfo struct {
34
35
36
37
38
39 NullSlotUsage int32
40
41
42
43
44
45
46 DefLevel int16
47
48
49
50
51
52 RepLevel int16
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 RepeatedAncestorDefLevel int16
73 }
74
75 func (l *LevelInfo) Equal(rhs *LevelInfo) bool {
76 return l.NullSlotUsage == rhs.NullSlotUsage &&
77 l.DefLevel == rhs.DefLevel &&
78 l.RepLevel == rhs.RepLevel &&
79 l.RepeatedAncestorDefLevel == rhs.RepeatedAncestorDefLevel
80 }
81
82 func (l *LevelInfo) HasNullableValues() bool {
83 return l.RepeatedAncestorDefLevel < l.DefLevel
84 }
85
86 func (l *LevelInfo) IncrementOptional() {
87 l.DefLevel++
88 }
89
90 func (l *LevelInfo) IncrementRepeated() int16 {
91 lastRepAncestor := l.RepeatedAncestorDefLevel
92
93
94 l.RepLevel++
95 l.DefLevel++
96
97
98
99
100
101
102 l.RepeatedAncestorDefLevel = l.DefLevel
103 return lastRepAncestor
104 }
105
106 func (l *LevelInfo) Increment(n schema.Node) {
107 switch n.RepetitionType() {
108 case parquet.Repetitions.Repeated:
109 l.IncrementRepeated()
110 case parquet.Repetitions.Optional:
111 l.IncrementOptional()
112 }
113 }
114
115
116 type ValidityBitmapInputOutput struct {
117
118
119
120
121
122
123 ReadUpperBound int64
124
125
126
127 Read int64
128
129 NullCount int64
130
131
132 ValidBits []byte
133
134 ValidBitsOffset int64
135 }
136
137
138 func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) (count uint64) {
139 const maxbatch = 8 * int(unsafe.Sizeof(uint64(0)))
140
141 if !hasRepeatedParent && int64(len(defLevels)) > remainingUpperBound {
142 panic("values read exceed upper bound")
143 }
144
145 var batch []int16
146 for len(defLevels) > 0 {
147 batchSize := shared_utils.Min(maxbatch, len(defLevels))
148 batch, defLevels = defLevels[:batchSize], defLevels[batchSize:]
149 definedBitmap := bmi.GreaterThanBitmap(batch, info.DefLevel-1)
150
151 if hasRepeatedParent {
152
153
154 presentBitmap := bmi.GreaterThanBitmap(batch, info.RepeatedAncestorDefLevel-1)
155 selectedBits := bmi.ExtractBits(definedBitmap, presentBitmap)
156 selectedCount := int64(bits.OnesCount64(presentBitmap))
157 if selectedCount > remainingUpperBound {
158 panic("values read exceeded upper bound")
159 }
160 wr.AppendWord(selectedBits, selectedCount)
161 count += uint64(bits.OnesCount64(selectedBits))
162 continue
163 }
164
165 wr.AppendWord(definedBitmap, int64(len(batch)))
166 count += uint64(bits.OnesCount64(definedBitmap))
167 }
168 return
169 }
170
171
172 func defLevelsToBitmapInternal(defLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput, hasRepeatedParent bool) {
173 wr := utils.NewFirstTimeBitmapWriter(out.ValidBits, out.ValidBitsOffset, int64(out.ReadUpperBound))
174 defer wr.Finish()
175 setCount := defLevelsBatchToBitmap(defLevels, out.ReadUpperBound, info, wr, hasRepeatedParent)
176 out.Read = int64(wr.Pos())
177 out.NullCount += out.Read - int64(setCount)
178 }
179
180
181 func DefLevelsToBitmap(defLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput) {
182 hasRepeatedParent := false
183 if info.RepLevel > 0 {
184 hasRepeatedParent = true
185 }
186 defLevelsToBitmapInternal(defLevels, info, out, hasRepeatedParent)
187 }
188
189
190
191 func DefRepLevelsToListInfo(defLevels, repLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput, offsets []int32) error {
192 var wr utils.BitmapWriter
193 if out.ValidBits != nil {
194 wr = utils.NewFirstTimeBitmapWriter(out.ValidBits, out.ValidBitsOffset, out.ReadUpperBound)
195 defer wr.Finish()
196 }
197 offsetPos := 0
198 for idx := range defLevels {
199
200 if defLevels[idx] < info.RepeatedAncestorDefLevel || repLevels[idx] > info.RepLevel {
201 continue
202 }
203
204 if repLevels[idx] == info.RepLevel {
205
206
207 if offsetPos < len(offsets) {
208 if offsets[offsetPos] == math.MaxInt32 {
209 return xerrors.New("list index overflow")
210 }
211 offsets[offsetPos]++
212 }
213 } else {
214 if (wr != nil && int64(wr.Pos()) >= out.ReadUpperBound) || (offsetPos >= int(out.ReadUpperBound)) {
215 return fmt.Errorf("definition levels exceeded upper bound: %d", out.ReadUpperBound)
216 }
217
218
219
220
221 if offsetPos+1 < len(offsets) {
222 offsetPos++
223
224
225
226 offsets[offsetPos] = offsets[offsetPos-1]
227 if defLevels[idx] >= info.DefLevel {
228 if offsets[offsetPos] == math.MaxInt32 {
229 return xerrors.New("list index overflow")
230 }
231 offsets[offsetPos]++
232 }
233 }
234
235 if wr != nil {
236
237
238 if defLevels[idx] >= info.DefLevel-1 {
239 wr.Set()
240 } else {
241 out.NullCount++
242 wr.Clear()
243 }
244 wr.Next()
245 }
246 }
247 }
248
249 if len(offsets) > 0 {
250 out.Read = int64(offsetPos)
251 } else if wr != nil {
252 out.Read = int64(wr.Pos())
253 }
254
255 if out.NullCount > 0 && info.NullSlotUsage > 1 {
256 return xerrors.New("null values with null_slot_usage > 1 not supported.")
257 }
258 return nil
259 }
260
261
262
263 func DefRepLevelsToBitmap(defLevels, repLevels []int16, info LevelInfo, out *ValidityBitmapInputOutput) error {
264 info.RepLevel++
265 info.DefLevel++
266 return DefRepLevelsToListInfo(defLevels, repLevels, info, out, nil)
267 }
268
View as plain text