1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pqarrow
18
19 import (
20 "fmt"
21 "sync/atomic"
22 "unsafe"
23
24 "github.com/apache/arrow/go/v15/arrow"
25 "github.com/apache/arrow/go/v15/arrow/array"
26 "github.com/apache/arrow/go/v15/arrow/memory"
27 "github.com/apache/arrow/go/v15/internal/bitutils"
28 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
29 "golang.org/x/xerrors"
30 )
31
32 type iterResult int8
33
34 const (
35 iterDone iterResult = -1
36 iterNext iterResult = 1
37 )
38
39 type elemRange struct {
40 start int64
41 end int64
42 }
43
44 func (e elemRange) empty() bool { return e.start == e.end }
45 func (e elemRange) size() int64 { return e.end - e.start }
46
47 type rangeSelector interface {
48 GetRange(idx int64) elemRange
49 }
50
51 type varRangeSelector struct {
52 offsets []int32
53 }
54
55 func (v varRangeSelector) GetRange(idx int64) elemRange {
56 return elemRange{int64(v.offsets[idx]), int64(v.offsets[idx+1])}
57 }
58
59 type fixedSizeRangeSelector struct {
60 listSize int32
61 }
62
63 func (f fixedSizeRangeSelector) GetRange(idx int64) elemRange {
64 start := idx * int64(f.listSize)
65 return elemRange{start, start + int64(f.listSize)}
66 }
67
68 type pathNode interface {
69 clone() pathNode
70 }
71
72 type allPresentTerminalNode struct {
73 defLevel int16
74 }
75
76 func (n *allPresentTerminalNode) clone() pathNode {
77 ret := *n
78 return &ret
79 }
80
81 func (n *allPresentTerminalNode) run(rng elemRange, ctx *pathWriteCtx) iterResult {
82 return ctx.AppendDefLevels(int(rng.size()), n.defLevel)
83 }
84
85 type allNullsTerminalNode struct {
86 defLevel int16
87 repLevel int16
88 }
89
90 func (n *allNullsTerminalNode) clone() pathNode {
91 ret := *n
92 return &ret
93 }
94
95 func (n *allNullsTerminalNode) run(rng elemRange, ctx *pathWriteCtx) iterResult {
96 fillRepLevels(int(rng.size()), n.repLevel, ctx)
97 return ctx.AppendDefLevels(int(rng.size()), n.defLevel)
98 }
99
100 type nullableTerminalNode struct {
101 bitmap []byte
102 elemOffset int64
103 defLevelIfPresent int16
104 defLevelIfNull int16
105 }
106
107 func (n *nullableTerminalNode) clone() pathNode {
108 ret := *n
109 return &ret
110 }
111
112 func (n *nullableTerminalNode) run(rng elemRange, ctx *pathWriteCtx) iterResult {
113 elems := rng.size()
114 ctx.ReserveDefLevels(int(elems))
115
116 var (
117 present = (*(*[2]byte)(unsafe.Pointer(&n.defLevelIfPresent)))[:]
118 null = (*(*[2]byte)(unsafe.Pointer(&n.defLevelIfNull)))[:]
119 )
120 rdr := bitutils.NewBitRunReader(n.bitmap, n.elemOffset+rng.start, elems)
121 for {
122 run := rdr.NextRun()
123 if run.Len == 0 {
124 break
125 }
126 if run.Set {
127 ctx.defLevels.UnsafeWriteCopy(int(run.Len), present)
128 } else {
129 ctx.defLevels.UnsafeWriteCopy(int(run.Len), null)
130 }
131 }
132 return iterDone
133 }
134
135 type listNode struct {
136 selector rangeSelector
137 prevRepLevel int16
138 repLevel int16
139 defLevelIfEmpty int16
140 isLast bool
141 }
142
143 func (n *listNode) clone() pathNode {
144 ret := *n
145 return &ret
146 }
147
148 func (n *listNode) run(rng, childRng *elemRange, ctx *pathWriteCtx) iterResult {
149 if rng.empty() {
150 return iterDone
151 }
152
153
154 start := rng.start
155 for {
156
157 *childRng = n.selector.GetRange(rng.start)
158 if !childRng.empty() {
159 break
160 }
161 rng.start++
162 if rng.empty() {
163 break
164 }
165 }
166
167
168
169
170
171
172
173 emptyElems := rng.start - start
174 if emptyElems > 0 {
175 fillRepLevels(int(emptyElems), n.prevRepLevel, ctx)
176 ctx.AppendDefLevels(int(emptyElems), n.defLevelIfEmpty)
177 }
178
179
180
181
182
183
184
185
186 if ctx.equalRepDeflevlsLen() && !rng.empty() {
187 ctx.AppendRepLevel(n.prevRepLevel)
188 }
189
190 if rng.empty() {
191 return iterDone
192 }
193
194 rng.start++
195 if n.isLast {
196
197
198
199 return n.fillForLast(rng, childRng, ctx)
200 }
201
202 return iterNext
203 }
204
205 func (n *listNode) fillForLast(rng, childRng *elemRange, ctx *pathWriteCtx) iterResult {
206 fillRepLevels(int(childRng.size()), n.repLevel, ctx)
207
208
209
210
211
212
213
214
215
216
217 for !rng.empty() {
218 sizeCheck := n.selector.GetRange(rng.start)
219 if sizeCheck.empty() {
220
221
222
223 break
224 }
225
226
227
228
229 ctx.AppendRepLevel(n.prevRepLevel)
230 ctx.AppendRepLevels(int(sizeCheck.size())-1, n.repLevel)
231 childRng.end = sizeCheck.end
232 rng.start++
233 }
234
235
236
237
238 ctx.recordPostListVisit(*childRng)
239 return iterNext
240 }
241
242 type nullableNode struct {
243 bitmap []byte
244 entryOffset int64
245 repLevelIfNull int16
246 defLevelIfNull int16
247
248 validBitsReader bitutils.BitRunReader
249 newRange bool
250 }
251
252 func (n *nullableNode) clone() pathNode {
253 var ret nullableNode = *n
254 return &ret
255 }
256
257 func (n *nullableNode) run(rng, childRng *elemRange, ctx *pathWriteCtx) iterResult {
258 if n.newRange {
259 n.validBitsReader = bitutils.NewBitRunReader(n.bitmap, n.entryOffset+rng.start, rng.size())
260 }
261 childRng.start = rng.start
262 run := n.validBitsReader.NextRun()
263 if !run.Set {
264 rng.start += run.Len
265 fillRepLevels(int(run.Len), n.repLevelIfNull, ctx)
266 ctx.AppendDefLevels(int(run.Len), n.defLevelIfNull)
267 run = n.validBitsReader.NextRun()
268 }
269
270 if rng.empty() {
271 n.newRange = true
272 return iterDone
273 }
274 childRng.start = rng.start
275 childRng.end = childRng.start
276 childRng.end += run.Len
277 rng.start += childRng.size()
278 n.newRange = false
279 return iterNext
280 }
281
282 type pathInfo struct {
283 path []pathNode
284 primitiveArr arrow.Array
285 maxDefLevel int16
286 maxRepLevel int16
287 leafIsNullable bool
288 }
289
290 func (p pathInfo) clone() pathInfo {
291 ret := p
292 ret.path = make([]pathNode, len(p.path))
293 for idx, n := range p.path {
294 ret.path[idx] = n.clone()
295 }
296 return ret
297 }
298
299 type pathBuilder struct {
300 info pathInfo
301 paths []pathInfo
302 nullableInParent bool
303
304 refCount int64
305 }
306
307 func (p *pathBuilder) Retain() {
308 atomic.AddInt64(&p.refCount, 1)
309 }
310
311 func (p *pathBuilder) Release() {
312 if atomic.AddInt64(&p.refCount, -1) == 0 {
313 for idx := range p.paths {
314 p.paths[idx].primitiveArr.Release()
315 p.paths[idx].primitiveArr = nil
316 }
317 }
318 }
319
320
321
322
323
324 func lazyNullCount(arr arrow.Array) int64 {
325 return int64(arr.Data().NullN())
326 }
327
328 func lazyNoNulls(arr arrow.Array) bool {
329 nulls := lazyNullCount(arr)
330 return nulls == 0 || (nulls == array.UnknownNullCount && arr.NullBitmapBytes() == nil)
331 }
332
333 type fixupVisitor struct {
334 maxRepLevel int
335 repLevelIfNull int16
336 }
337
338 func (f *fixupVisitor) visit(n pathNode) {
339 switch n := n.(type) {
340 case *listNode:
341 if n.repLevel == int16(f.maxRepLevel) {
342 n.isLast = true
343 f.repLevelIfNull = -1
344 } else {
345 f.repLevelIfNull = n.repLevel
346 }
347 case *nullableTerminalNode:
348 case *allPresentTerminalNode:
349 case *allNullsTerminalNode:
350 if f.repLevelIfNull != -1 {
351 n.repLevel = f.repLevelIfNull
352 }
353 case *nullableNode:
354 if f.repLevelIfNull != -1 {
355 n.repLevelIfNull = f.repLevelIfNull
356 }
357 }
358 }
359
360 func fixup(info pathInfo) pathInfo {
361
362 if info.maxRepLevel == 0 {
363 return info
364 }
365
366 visitor := fixupVisitor{maxRepLevel: int(info.maxRepLevel)}
367 if visitor.maxRepLevel > 0 {
368 visitor.repLevelIfNull = 0
369 } else {
370 visitor.repLevelIfNull = -1
371 }
372
373 for _, p := range info.path {
374 visitor.visit(p)
375 }
376 return info
377 }
378
379 func (p *pathBuilder) Visit(arr arrow.Array) error {
380 switch arr.DataType().ID() {
381 case arrow.LIST, arrow.MAP:
382 p.maybeAddNullable(arr)
383
384 p.info.maxDefLevel++
385 p.info.maxRepLevel++
386 larr, ok := arr.(*array.List)
387 if !ok {
388 larr = arr.(*array.Map).List
389 }
390
391 p.info.path = append(p.info.path, &listNode{
392 selector: varRangeSelector{larr.Offsets()[larr.Data().Offset():]},
393 prevRepLevel: p.info.maxRepLevel - 1,
394 repLevel: p.info.maxRepLevel,
395 defLevelIfEmpty: p.info.maxDefLevel - 1,
396 })
397 p.nullableInParent = ok
398 return p.Visit(larr.ListValues())
399 case arrow.FIXED_SIZE_LIST:
400 p.maybeAddNullable(arr)
401 larr := arr.(*array.FixedSizeList)
402 listSize := larr.DataType().(*arrow.FixedSizeListType).Len()
403
404
405 p.info.maxDefLevel++
406 p.info.maxRepLevel++
407 p.info.path = append(p.info.path, &listNode{
408 selector: fixedSizeRangeSelector{listSize},
409 prevRepLevel: p.info.maxRepLevel - 1,
410 repLevel: p.info.maxRepLevel,
411 defLevelIfEmpty: p.info.maxDefLevel,
412 })
413
414 return p.Visit(larr.ListValues())
415 case arrow.DICTIONARY:
416
417
418 dictArr := arr.(*array.Dictionary)
419 valType := dictArr.DataType().(*arrow.DictionaryType).ValueType
420 if _, ok := valType.(arrow.NestedType); ok {
421 return fmt.Errorf("%w: writing DictionaryArray with nested dictionary type not yet supported",
422 arrow.ErrNotImplemented)
423 }
424 if dictArr.Dictionary().NullN() > 0 {
425 return fmt.Errorf("%w: writing DictionaryArray with null encoded in dictionary not yet supported",
426 arrow.ErrNotImplemented)
427 }
428 p.addTerminalInfo(arr)
429 return nil
430 case arrow.STRUCT:
431 p.maybeAddNullable(arr)
432 infoBackup := p.info
433 dt := arr.DataType().(*arrow.StructType)
434 for idx, f := range dt.Fields() {
435 p.nullableInParent = f.Nullable
436 if err := p.Visit(arr.(*array.Struct).Field(idx)); err != nil {
437 return err
438 }
439 p.info = infoBackup
440 }
441 return nil
442 case arrow.EXTENSION:
443 return p.Visit(arr.(array.ExtensionArray).Storage())
444 case arrow.SPARSE_UNION, arrow.DENSE_UNION:
445 return xerrors.New("union types aren't supported in parquet")
446 default:
447 p.addTerminalInfo(arr)
448 return nil
449 }
450 }
451
452 func (p *pathBuilder) addTerminalInfo(arr arrow.Array) {
453 p.info.leafIsNullable = p.nullableInParent
454 if p.nullableInParent {
455 p.info.maxDefLevel++
456 }
457
458
459
460
461 if lazyNoNulls(arr) {
462 p.info.path = append(p.info.path, &allPresentTerminalNode{p.info.maxDefLevel})
463 p.info.leafIsNullable = false
464 } else if lazyNullCount(arr) == int64(arr.Len()) {
465 p.info.path = append(p.info.path, &allNullsTerminalNode{p.info.maxDefLevel - 1, -1})
466 } else {
467 p.info.path = append(p.info.path, &nullableTerminalNode{bitmap: arr.NullBitmapBytes(), elemOffset: int64(arr.Data().Offset()), defLevelIfPresent: p.info.maxDefLevel, defLevelIfNull: p.info.maxDefLevel - 1})
468 }
469 arr.Retain()
470 p.info.primitiveArr = arr
471 p.paths = append(p.paths, fixup(p.info.clone()))
472 }
473
474 func (p *pathBuilder) maybeAddNullable(arr arrow.Array) {
475 if !p.nullableInParent {
476 return
477 }
478
479 p.info.maxDefLevel++
480 if lazyNoNulls(arr) {
481 return
482 }
483
484 if lazyNullCount(arr) == int64(arr.Len()) {
485 p.info.path = append(p.info.path, &allNullsTerminalNode{p.info.maxDefLevel - 1, -1})
486 return
487 }
488
489 p.info.path = append(p.info.path, &nullableNode{
490 bitmap: arr.NullBitmapBytes(), entryOffset: int64(arr.Data().Offset()),
491 defLevelIfNull: p.info.maxDefLevel - 1, repLevelIfNull: -1,
492 newRange: true,
493 })
494 }
495
496 type multipathLevelBuilder struct {
497 rootRange elemRange
498 data arrow.ArrayData
499 builder pathBuilder
500
501 refCount int64
502 }
503
504 func (m *multipathLevelBuilder) Retain() {
505 atomic.AddInt64(&m.refCount, 1)
506 }
507
508 func (m *multipathLevelBuilder) Release() {
509 if atomic.AddInt64(&m.refCount, -1) == 0 {
510 m.data.Release()
511 m.data = nil
512 m.builder.Release()
513 m.builder = pathBuilder{}
514 }
515 }
516
517 func newMultipathLevelBuilder(arr arrow.Array, fieldNullable bool) (*multipathLevelBuilder, error) {
518 ret := &multipathLevelBuilder{
519 refCount: 1,
520 rootRange: elemRange{int64(0), int64(arr.Data().Len())},
521 data: arr.Data(),
522 builder: pathBuilder{nullableInParent: fieldNullable, paths: make([]pathInfo, 0), refCount: 1},
523 }
524 if err := ret.builder.Visit(arr); err != nil {
525 return nil, err
526 }
527 arr.Data().Retain()
528 return ret, nil
529 }
530
531 func (m *multipathLevelBuilder) leafCount() int {
532 return len(m.builder.paths)
533 }
534
535 func (m *multipathLevelBuilder) write(leafIdx int, ctx *arrowWriteContext) (multipathLevelResult, error) {
536 return writePath(m.rootRange, &m.builder.paths[leafIdx], ctx)
537 }
538
539 func (m *multipathLevelBuilder) writeAll(ctx *arrowWriteContext) (res []multipathLevelResult, err error) {
540 res = make([]multipathLevelResult, m.leafCount())
541 for idx := range res {
542 res[idx], err = m.write(idx, ctx)
543 if err != nil {
544 break
545 }
546 }
547 return
548 }
549
550 type multipathLevelResult struct {
551 leafArr arrow.Array
552 defLevels []int16
553 defLevelsBuffer encoding.Buffer
554 repLevels []int16
555 repLevelsBuffer encoding.Buffer
556
557
558
559
560
561
562
563
564
565
566 postListVisitedElems []elemRange
567
568 leafIsNullable bool
569 }
570
571 func (m *multipathLevelResult) Release() {
572 m.defLevels = nil
573 if m.defLevelsBuffer != nil {
574 m.defLevelsBuffer.Release()
575 }
576 if m.repLevels != nil {
577 m.repLevels = nil
578 m.repLevelsBuffer.Release()
579 }
580 }
581
582 type pathWriteCtx struct {
583 mem memory.Allocator
584 defLevels *int16BufferBuilder
585 repLevels *int16BufferBuilder
586 visitedElems []elemRange
587 }
588
589 func (p *pathWriteCtx) ReserveDefLevels(elems int) iterResult {
590 p.defLevels.Reserve(elems)
591 return iterDone
592 }
593
594 func (p *pathWriteCtx) AppendDefLevel(lvl int16) iterResult {
595 p.defLevels.Append(lvl)
596 return iterDone
597 }
598
599 func (p *pathWriteCtx) AppendDefLevels(count int, defLevel int16) iterResult {
600 p.defLevels.AppendCopies(count, defLevel)
601 return iterDone
602 }
603
604 func (p *pathWriteCtx) UnsafeAppendDefLevel(v int16) iterResult {
605 p.defLevels.UnsafeAppend(v)
606 return iterDone
607 }
608
609 func (p *pathWriteCtx) AppendRepLevel(lvl int16) iterResult {
610 p.repLevels.Append(lvl)
611 return iterDone
612 }
613
614 func (p *pathWriteCtx) AppendRepLevels(count int, lvl int16) iterResult {
615 p.repLevels.AppendCopies(count, lvl)
616 return iterDone
617 }
618
619 func (p *pathWriteCtx) equalRepDeflevlsLen() bool { return p.defLevels.Len() == p.repLevels.Len() }
620
621 func (p *pathWriteCtx) recordPostListVisit(rng elemRange) {
622 if len(p.visitedElems) > 0 && rng.start == p.visitedElems[len(p.visitedElems)-1].end {
623 p.visitedElems[len(p.visitedElems)-1].end = rng.end
624 return
625 }
626 p.visitedElems = append(p.visitedElems, rng)
627 }
628
629 type int16BufferBuilder struct {
630 *encoding.PooledBufferWriter
631 }
632
633 func (b *int16BufferBuilder) Values() []int16 {
634 return arrow.Int16Traits.CastFromBytes(b.PooledBufferWriter.Bytes())
635 }
636
637 func (b *int16BufferBuilder) Value(i int) int16 {
638 return b.Values()[i]
639 }
640
641 func (b *int16BufferBuilder) Reserve(n int) {
642 b.PooledBufferWriter.Reserve(n * arrow.Int16SizeBytes)
643 }
644
645 func (b *int16BufferBuilder) Len() int { return b.PooledBufferWriter.Len() / arrow.Int16SizeBytes }
646
647 func (b *int16BufferBuilder) AppendCopies(count int, val int16) {
648 b.Reserve(count)
649 b.UnsafeWriteCopy(count, (*(*[2]byte)(unsafe.Pointer(&val)))[:])
650 }
651
652 func (b *int16BufferBuilder) UnsafeAppend(v int16) {
653 b.PooledBufferWriter.UnsafeWrite((*(*[2]byte)(unsafe.Pointer(&v)))[:])
654 }
655
656 func (b *int16BufferBuilder) Append(v int16) {
657 b.PooledBufferWriter.Reserve(arrow.Int16SizeBytes)
658 b.PooledBufferWriter.Write((*(*[2]byte)(unsafe.Pointer(&v)))[:])
659 }
660
661 func fillRepLevels(count int, repLvl int16, ctx *pathWriteCtx) {
662 if repLvl == -1 {
663 return
664 }
665
666 fillCount := count
667
668
669
670
671 if !ctx.equalRepDeflevlsLen() {
672 fillCount--
673 }
674 ctx.AppendRepLevels(fillCount, repLvl)
675 }
676
677 func writePath(rootRange elemRange, info *pathInfo, arrCtx *arrowWriteContext) (multipathLevelResult, error) {
678 stack := make([]elemRange, len(info.path))
679 buildResult := multipathLevelResult{
680 leafArr: info.primitiveArr,
681 leafIsNullable: info.leafIsNullable,
682 }
683
684 if info.maxDefLevel == 0 {
685
686 leafLen := buildResult.leafArr.Len()
687 buildResult.postListVisitedElems = []elemRange{{0, int64(leafLen)}}
688 return buildResult, nil
689 }
690
691 stack[0] = rootRange
692 if arrCtx.defLevelsBuffer != nil {
693 arrCtx.defLevelsBuffer.Release()
694 arrCtx.defLevelsBuffer = nil
695 }
696 if arrCtx.repLevelsBuffer != nil {
697 arrCtx.repLevelsBuffer.Release()
698 arrCtx.repLevelsBuffer = nil
699 }
700
701 ctx := pathWriteCtx{arrCtx.props.mem,
702 &int16BufferBuilder{encoding.NewPooledBufferWriter(0)},
703 &int16BufferBuilder{encoding.NewPooledBufferWriter(0)},
704 make([]elemRange, 0)}
705
706 ctx.defLevels.Reserve(int(rootRange.size()))
707 if info.maxRepLevel > 0 {
708 ctx.repLevels.Reserve(int(rootRange.size()))
709 }
710
711 stackBase := 0
712 stackPos := stackBase
713 for stackPos >= stackBase {
714 var res iterResult
715 switch n := info.path[stackPos].(type) {
716 case *nullableNode:
717 res = n.run(&stack[stackPos], &stack[stackPos+1], &ctx)
718 case *listNode:
719 res = n.run(&stack[stackPos], &stack[stackPos+1], &ctx)
720 case *nullableTerminalNode:
721 res = n.run(stack[stackPos], &ctx)
722 case *allPresentTerminalNode:
723 res = n.run(stack[stackPos], &ctx)
724 case *allNullsTerminalNode:
725 res = n.run(stack[stackPos], &ctx)
726 }
727 stackPos += int(res)
728 }
729
730 if ctx.repLevels.Len() > 0 {
731
732 buildResult.repLevels = ctx.repLevels.Values()
733 buildResult.repLevelsBuffer = ctx.repLevels.Finish()
734
735 buildResult.postListVisitedElems, ctx.visitedElems = ctx.visitedElems, buildResult.postListVisitedElems
736
737
738
739 if len(buildResult.postListVisitedElems) == 0 {
740 buildResult.postListVisitedElems = append(buildResult.postListVisitedElems, elemRange{0, 0})
741 }
742 } else {
743 buildResult.postListVisitedElems = append(buildResult.postListVisitedElems, elemRange{0, int64(buildResult.leafArr.Len())})
744 buildResult.repLevels = nil
745 }
746
747 buildResult.defLevels = ctx.defLevels.Values()
748 buildResult.defLevelsBuffer = ctx.defLevels.Finish()
749 return buildResult, nil
750 }
751
View as plain text