...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package metadata
18
19 import (
20 "bytes"
21 "encoding/binary"
22 "math"
23 "unsafe"
24
25 "github.com/apache/arrow/go/v15/arrow"
26 "github.com/apache/arrow/go/v15/arrow/float16"
27 "github.com/apache/arrow/go/v15/arrow/memory"
28 "github.com/apache/arrow/go/v15/internal/utils"
29 "github.com/apache/arrow/go/v15/parquet"
30 "github.com/apache/arrow/go/v15/parquet/internal/debug"
31 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
32 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
33 "github.com/apache/arrow/go/v15/parquet/schema"
34 )
35
36
37
38 type StatProvider interface {
39 GetMin() []byte
40 GetMax() []byte
41 GetNullCount() int64
42 GetDistinctCount() int64
43 IsSetMax() bool
44 IsSetMin() bool
45 IsSetNullCount() bool
46 IsSetDistinctCount() bool
47 }
48
49
50
51 type EncodedStatistics struct {
52 HasMax bool
53 Max []byte
54 HasMin bool
55 Min []byte
56 Signed bool
57 HasNullCount bool
58 NullCount int64
59 HasDistinctCount bool
60 DistinctCount int64
61 }
62
63
64
65
66
67
68
69
70 func (e *EncodedStatistics) ApplyStatSizeLimits(length int) {
71 if len(e.Max) > length {
72 e.HasMax = false
73 }
74 if len(e.Min) > length {
75 e.HasMin = false
76 }
77 }
78
79
80 func (e *EncodedStatistics) IsSet() bool {
81 return e.HasMin || e.HasMax || e.HasNullCount || e.HasDistinctCount
82 }
83
84
85 func (e *EncodedStatistics) SetMax(val []byte) *EncodedStatistics {
86 e.Max = val[:]
87 e.HasMax = true
88 return e
89 }
90
91
92 func (e *EncodedStatistics) SetMin(val []byte) *EncodedStatistics {
93 e.Min = val[:]
94 e.HasMin = true
95 return e
96 }
97
98
99 func (e *EncodedStatistics) SetNullCount(val int64) *EncodedStatistics {
100 e.NullCount = val
101 e.HasNullCount = true
102 return e
103 }
104
105
106 func (e *EncodedStatistics) SetDistinctCount(val int64) *EncodedStatistics {
107 e.DistinctCount = val
108 e.HasDistinctCount = true
109 return e
110 }
111
112 func (e *EncodedStatistics) ToThrift() (stats *format.Statistics) {
113 stats = format.NewStatistics()
114 if e.HasMin {
115 stats.MinValue = e.Min
116
117 if e.Signed {
118 stats.Min = e.Min
119 }
120 }
121 if e.HasMax {
122 stats.MaxValue = e.Max
123
124 if e.Signed {
125 stats.Max = e.Max
126 }
127 }
128 if e.HasNullCount {
129 stats.NullCount = &e.NullCount
130 }
131 if e.HasDistinctCount {
132 stats.DistinctCount = &e.DistinctCount
133 }
134 return
135 }
136
137
138
139 type TypedStatistics interface {
140
141 Type() parquet.Type
142
143 HasMinMax() bool
144
145 HasNullCount() bool
146
147
148
149 HasDistinctCount() bool
150 NullCount() int64
151 DistinctCount() int64
152 NumValues() int64
153
154 Descr() *schema.Column
155
156
157
158
159 EncodeMin() []byte
160
161
162
163 EncodeMax() []byte
164
165 Encode() (EncodedStatistics, error)
166
167
168
169 Reset()
170
171
172 Merge(TypedStatistics)
173
174
175
176
177 UpdateFromArrow(values arrow.Array, updateCounts bool) error
178
179
180 IncNulls(int64)
181
182
183 IncDistinct(int64)
184
185
186 IncNumValues(int64)
187 }
188
189 type statistics struct {
190 descr *schema.Column
191 hasMinMax bool
192 hasNullCount bool
193 hasDistinctCount bool
194 mem memory.Allocator
195 nvalues int64
196 stats EncodedStatistics
197 order schema.SortOrder
198
199 encoder encoding.TypedEncoder
200 }
201
202 func (s *statistics) IncNumValues(n int64) {
203 s.nvalues += n
204 }
205 func (s *statistics) IncNulls(n int64) {
206 s.stats.NullCount += n
207 s.hasNullCount = true
208 }
209 func (s *statistics) IncDistinct(n int64) {
210 s.stats.DistinctCount += n
211 s.hasDistinctCount = true
212 }
213
214 func (s *statistics) Descr() *schema.Column { return s.descr }
215 func (s *statistics) Type() parquet.Type { return s.descr.PhysicalType() }
216 func (s *statistics) HasDistinctCount() bool { return s.hasDistinctCount }
217 func (s *statistics) HasMinMax() bool { return s.hasMinMax }
218 func (s *statistics) HasNullCount() bool { return s.hasNullCount }
219 func (s *statistics) NullCount() int64 { return s.stats.NullCount }
220 func (s *statistics) DistinctCount() int64 { return s.stats.DistinctCount }
221 func (s *statistics) NumValues() int64 { return s.nvalues }
222
223 func (s *statistics) Reset() {
224 s.stats.NullCount = 0
225 s.stats.DistinctCount = 0
226 s.nvalues = 0
227 s.hasMinMax = false
228 s.hasDistinctCount = false
229 s.hasNullCount = false
230 }
231
232
233
234 func (s *statistics) merge(other TypedStatistics) {
235 s.nvalues += other.NumValues()
236 if other.HasNullCount() {
237 s.stats.NullCount += other.NullCount()
238 }
239 if other.HasDistinctCount() {
240
241
242
243
244 s.stats.DistinctCount += other.DistinctCount()
245 }
246 }
247
248 func coalesce(val, fallback interface{}) interface{} {
249 switch v := val.(type) {
250 case float32:
251 if math.IsNaN(float64(v)) {
252 return fallback
253 }
254 case float64:
255 if math.IsNaN(v) {
256 return fallback
257 }
258 }
259 return val
260 }
261
262 func signedByteLess(a, b []byte) bool {
263
264
265
266
267 if len(a) == 0 || len(b) == 0 {
268 return len(a) == 0 && len(b) > 0
269 }
270
271 sa := *(*[]int8)(unsafe.Pointer(&a))
272 sb := *(*[]int8)(unsafe.Pointer(&b))
273
274
275
276
277 if int8(0x80&uint8(sa[0])) != int8(0x80&uint8(sb[0])) || (len(sa) == len(sb) && sa[0] != sb[0]) {
278 return sa[0] < sb[0]
279 }
280
281
282
283
284
285
286 if len(a) != len(b) {
287 var lead []byte
288 if len(a) > len(b) {
289 leadLen := len(a) - len(b)
290 lead = a[:leadLen]
291 a = a[leadLen:]
292 } else {
293 debug.Assert(len(a) < len(b), "something weird in byte slice signed comparison")
294 leadLen := len(b) - len(a)
295 lead = b[:leadLen]
296 b = b[leadLen:]
297 }
298
299
300 var extension byte
301 if sa[0] < 0 {
302 extension = 0xFF
303 }
304
305 notequal := false
306 for _, c := range lead {
307 if c != extension {
308 notequal = true
309 break
310 }
311 }
312
313 if notequal {
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 neg := sa[0] < 0
329 blonger := len(sa) < len(sb)
330 return neg != blonger
331 }
332 } else {
333 a = a[1:]
334 b = b[1:]
335 }
336
337 return bytes.Compare(a, b) == -1
338 }
339
340 func (BooleanStatistics) defaultMin() bool { return true }
341 func (BooleanStatistics) defaultMax() bool { return false }
342 func (s *Int32Statistics) defaultMin() int32 {
343 if s.order == schema.SortUNSIGNED {
344 val := uint32(math.MaxUint32)
345 return int32(val)
346 }
347 return math.MaxInt32
348 }
349
350 func (s *Int32Statistics) defaultMax() int32 {
351 if s.order == schema.SortUNSIGNED {
352 return int32(0)
353 }
354 return math.MinInt32
355 }
356
357 func (s *Int64Statistics) defaultMin() int64 {
358 if s.order == schema.SortUNSIGNED {
359 val := uint64(math.MaxUint64)
360 return int64(val)
361 }
362 return math.MaxInt64
363 }
364
365 func (s *Int64Statistics) defaultMax() int64 {
366 if s.order == schema.SortUNSIGNED {
367 return int64(0)
368 }
369 return math.MinInt64
370 }
371
372 var (
373 defaultMinInt96 parquet.Int96
374 defaultMinUInt96 parquet.Int96
375 defaultMaxInt96 parquet.Int96
376 defaultMaxUInt96 parquet.Int96
377
378 defaultMinFloat16 parquet.FixedLenByteArray = float16.MaxNum.ToLEBytes()
379 defaultMaxFloat16 parquet.FixedLenByteArray = float16.MinNum.ToLEBytes()
380 )
381
382 func init() {
383 i96 := arrow.Uint32Traits.CastFromBytes(defaultMinInt96[:])
384 i96[0] = math.MaxUint32
385 i96[1] = math.MaxUint32
386 i96[2] = math.MaxInt32
387
388 i96 = arrow.Uint32Traits.CastFromBytes(defaultMinUInt96[:])
389 i96[0] = math.MaxUint32
390 i96[1] = math.MaxUint32
391 i96[2] = math.MaxUint32
392
393
394 i96 = arrow.Uint32Traits.CastFromBytes(defaultMaxInt96[:])
395 i96[2] = math.MaxInt32 + 1
396
397
398 }
399
400 func (s *Int96Statistics) defaultMin() parquet.Int96 {
401 if s.order == schema.SortUNSIGNED {
402 return defaultMinUInt96
403 }
404 return defaultMinInt96
405 }
406
407 func (s *Int96Statistics) defaultMax() parquet.Int96 {
408 if s.order == schema.SortUNSIGNED {
409 return defaultMaxUInt96
410 }
411 return defaultMaxInt96
412 }
413
414 func (Float16Statistics) defaultMin() parquet.FixedLenByteArray {
415 return defaultMinFloat16
416 }
417
418 func (Float16Statistics) defaultMax() parquet.FixedLenByteArray {
419 return defaultMaxFloat16
420 }
421
422 func (Float32Statistics) defaultMin() float32 { return math.MaxFloat32 }
423 func (Float32Statistics) defaultMax() float32 { return -math.MaxFloat32 }
424 func (Float64Statistics) defaultMin() float64 { return math.MaxFloat64 }
425 func (Float64Statistics) defaultMax() float64 { return -math.MaxFloat64 }
426 func (ByteArrayStatistics) defaultMin() parquet.ByteArray { return nil }
427 func (ByteArrayStatistics) defaultMax() parquet.ByteArray { return nil }
428 func (FixedLenByteArrayStatistics) defaultMin() parquet.FixedLenByteArray { return nil }
429 func (FixedLenByteArrayStatistics) defaultMax() parquet.FixedLenByteArray { return nil }
430
431 func (BooleanStatistics) equal(a, b bool) bool { return a == b }
432 func (Int32Statistics) equal(a, b int32) bool { return a == b }
433 func (Int64Statistics) equal(a, b int64) bool { return a == b }
434 func (Float32Statistics) equal(a, b float32) bool { return a == b }
435 func (Float64Statistics) equal(a, b float64) bool { return a == b }
436 func (Int96Statistics) equal(a, b parquet.Int96) bool { return bytes.Equal(a[:], b[:]) }
437 func (ByteArrayStatistics) equal(a, b parquet.ByteArray) bool { return bytes.Equal(a, b) }
438 func (FixedLenByteArrayStatistics) equal(a, b parquet.FixedLenByteArray) bool {
439 return bytes.Equal(a, b)
440 }
441
442 func (Float16Statistics) equal(a, b parquet.FixedLenByteArray) bool {
443 return float16.FromLEBytes(a).Equal(float16.FromLEBytes(b))
444 }
445
446 func (BooleanStatistics) less(a, b bool) bool {
447 return !a && b
448 }
449
450 func (s *Int32Statistics) less(a, b int32) bool {
451 if s.order == schema.SortUNSIGNED {
452 return uint32(a) < uint32(b)
453 }
454 return a < b
455 }
456
457 func (s *Int64Statistics) less(a, b int64) bool {
458 if s.order == schema.SortUNSIGNED {
459 return uint64(a) < uint64(b)
460 }
461 return a < b
462 }
463 func (Float32Statistics) less(a, b float32) bool { return a < b }
464 func (Float64Statistics) less(a, b float64) bool { return a < b }
465 func (s *Int96Statistics) less(a, b parquet.Int96) bool {
466 i96a := arrow.Uint32Traits.CastFromBytes(a[:])
467 i96b := arrow.Uint32Traits.CastFromBytes(b[:])
468
469 a0, a1, a2 := utils.ToLEUint32(i96a[0]), utils.ToLEUint32(i96a[1]), utils.ToLEUint32(i96a[2])
470 b0, b1, b2 := utils.ToLEUint32(i96b[0]), utils.ToLEUint32(i96b[1]), utils.ToLEUint32(i96b[2])
471
472 if a2 != b2 {
473
474 if s.order == schema.SortSIGNED {
475 return int32(a2) < int32(b2)
476 }
477 return a2 < b2
478 } else if a1 != b1 {
479 return a1 < b1
480 }
481 return a0 < b0
482 }
483
484 func (s *ByteArrayStatistics) less(a, b parquet.ByteArray) bool {
485 if s.order == schema.SortUNSIGNED {
486 return bytes.Compare(a, b) == -1
487 }
488
489 return signedByteLess([]byte(a), []byte(b))
490 }
491
492 func (s *FixedLenByteArrayStatistics) less(a, b parquet.FixedLenByteArray) bool {
493 if s.order == schema.SortUNSIGNED {
494 return bytes.Compare(a, b) == -1
495 }
496
497 return signedByteLess([]byte(a), []byte(b))
498 }
499
500 func (Float16Statistics) less(a, b parquet.FixedLenByteArray) bool {
501 return float16.FromLEBytes(a).Less(float16.FromLEBytes(b))
502 }
503
504 func (BooleanStatistics) cleanStat(minMax minmaxPairBoolean) *minmaxPairBoolean { return &minMax }
505 func (Int32Statistics) cleanStat(minMax minmaxPairInt32) *minmaxPairInt32 { return &minMax }
506 func (Int64Statistics) cleanStat(minMax minmaxPairInt64) *minmaxPairInt64 { return &minMax }
507 func (Int96Statistics) cleanStat(minMax minmaxPairInt96) *minmaxPairInt96 { return &minMax }
508
509
510
511
512
513
514
515
516 func (Float32Statistics) cleanStat(minMax minmaxPairFloat32) *minmaxPairFloat32 {
517 if math.IsNaN(float64(minMax[0])) || math.IsNaN(float64(minMax[1])) {
518 return nil
519 }
520
521 if minMax[0] == math.MaxFloat32 && minMax[1] == -math.MaxFloat32 {
522 return nil
523 }
524
525 var zero float32 = 0
526 if minMax[0] == zero && !math.Signbit(float64(minMax[0])) {
527 minMax[0] = -minMax[0]
528 }
529
530 if minMax[1] == zero && math.Signbit(float64(minMax[1])) {
531 minMax[1] = -minMax[1]
532 }
533
534 return &minMax
535 }
536
537 func (Float64Statistics) cleanStat(minMax minmaxPairFloat64) *minmaxPairFloat64 {
538 if math.IsNaN(minMax[0]) || math.IsNaN(minMax[1]) {
539 return nil
540 }
541
542 if minMax[0] == math.MaxFloat64 && minMax[1] == -math.MaxFloat64 {
543 return nil
544 }
545
546 var zero float64 = 0
547 if minMax[0] == zero && !math.Signbit(minMax[0]) {
548 minMax[0] = -minMax[0]
549 }
550
551 if minMax[1] == zero && math.Signbit(minMax[1]) {
552 minMax[1] = -minMax[1]
553 }
554
555 return &minMax
556 }
557
558 func (Float16Statistics) cleanStat(minMax minmaxPairFloat16) *minmaxPairFloat16 {
559 min := float16.FromLEBytes(minMax[0][:])
560 max := float16.FromLEBytes(minMax[1][:])
561
562 if min.IsNaN() || max.IsNaN() {
563 return nil
564 }
565
566 if min.Equal(float16.MaxNum) && max.Equal(float16.MinNum) {
567 return nil
568 }
569
570 zero := float16.New(0)
571 if min.Equal(zero) && !min.Signbit() {
572 minMax[0] = min.Negate().ToLEBytes()
573 }
574 if max.Equal(zero) && max.Signbit() {
575 minMax[1] = max.Negate().ToLEBytes()
576 }
577
578 return &minMax
579 }
580
581 func (ByteArrayStatistics) cleanStat(minMax minmaxPairByteArray) *minmaxPairByteArray {
582 if minMax[0] == nil || minMax[1] == nil {
583 return nil
584 }
585 return &minMax
586 }
587
588 func (FixedLenByteArrayStatistics) cleanStat(minMax minmaxPairFixedLenByteArray) *minmaxPairFixedLenByteArray {
589 if minMax[0] == nil || minMax[1] == nil {
590 return nil
591 }
592 return &minMax
593 }
594
595 func GetStatValue(typ parquet.Type, val []byte) interface{} {
596 switch typ {
597 case parquet.Types.Boolean:
598 return val[0] != 0
599 case parquet.Types.Int32:
600 return int32(binary.LittleEndian.Uint32(val))
601 case parquet.Types.Int64:
602 return int64(binary.LittleEndian.Uint64(val))
603 case parquet.Types.Int96:
604 p := parquet.Int96{}
605 copy(p[:], val)
606 return p
607 case parquet.Types.Float:
608 return math.Float32frombits(binary.LittleEndian.Uint32(val))
609 case parquet.Types.Double:
610 return math.Float64frombits(binary.LittleEndian.Uint64(val))
611 case parquet.Types.ByteArray:
612 fallthrough
613 case parquet.Types.FixedLenByteArray:
614 return val
615 }
616 return nil
617 }
618
View as plain text