1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pqarrow
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "sync"
24 "sync/atomic"
25
26 "github.com/apache/arrow/go/v15/arrow"
27 "github.com/apache/arrow/go/v15/arrow/array"
28 "github.com/apache/arrow/go/v15/arrow/arrio"
29 "github.com/apache/arrow/go/v15/arrow/memory"
30 "github.com/apache/arrow/go/v15/parquet"
31 "github.com/apache/arrow/go/v15/parquet/file"
32 "github.com/apache/arrow/go/v15/parquet/schema"
33 "golang.org/x/sync/errgroup"
34 "golang.org/x/xerrors"
35 )
36
37 type itrFactory func(int, *file.Reader) *columnIterator
38
39 type readerCtx struct {
40 rdr *file.Reader
41 mem memory.Allocator
42 colFactory itrFactory
43 filterLeaves bool
44 includedLeaves map[int]bool
45 }
46
47 func (r readerCtx) includesLeaf(idx int) bool {
48 _, ok := r.includedLeaves[idx]
49 return ok
50 }
51
52
53
54
55
56
57
58
59
60 func ReadTable(ctx context.Context, r parquet.ReaderAtSeeker, props *parquet.ReaderProperties, arrProps ArrowReadProperties, mem memory.Allocator) (arrow.Table, error) {
61 pf, err := file.NewParquetReader(r, file.WithReadProps(props))
62 if err != nil {
63 return nil, err
64 }
65
66 reader, err := NewFileReader(pf, arrProps, mem)
67 if err != nil {
68 return nil, err
69 }
70
71 return reader.ReadTable(ctx)
72 }
73
74
75
76
77
78
79 type FileReader struct {
80 mem memory.Allocator
81 rdr *file.Reader
82
83 Props ArrowReadProperties
84 Manifest *SchemaManifest
85 }
86
87
88
89
90
91
92 func NewFileReader(rdr *file.Reader, props ArrowReadProperties, mem memory.Allocator) (*FileReader, error) {
93 manifest, err := NewSchemaManifest(rdr.MetaData().Schema, rdr.MetaData().KeyValueMetadata(), &props)
94 if err != nil {
95 return nil, err
96 }
97
98 return &FileReader{
99 mem: mem,
100 rdr: rdr,
101 Props: props,
102 Manifest: manifest,
103 }, nil
104 }
105
106
107 func (fr *FileReader) Schema() (*arrow.Schema, error) {
108 return FromParquet(fr.rdr.MetaData().Schema, &fr.Props, fr.rdr.MetaData().KeyValueMetadata())
109 }
110
111 type colReaderImpl interface {
112 LoadBatch(nrecs int64) error
113 BuildArray(boundedLen int64) (*arrow.Chunked, error)
114 GetDefLevels() ([]int16, error)
115 GetRepLevels() ([]int16, error)
116 Field() *arrow.Field
117 IsOrHasRepeatedChild() bool
118 Retain()
119 Release()
120 }
121
122
123
124 type ColumnReader struct {
125 colReaderImpl
126 }
127
128
129
130 func (c *ColumnReader) NextBatch(size int64) (*arrow.Chunked, error) {
131 if err := c.LoadBatch(size); err != nil {
132 return nil, err
133 }
134 return c.BuildArray(size)
135 }
136
137 type rdrCtxKey struct{}
138
139 func readerCtxFromContext(ctx context.Context) readerCtx {
140 rdc := ctx.Value(rdrCtxKey{})
141 if rdc != nil {
142 return rdc.(readerCtx)
143 }
144 panic("no readerctx")
145 }
146
147
148 func (fr *FileReader) ParquetReader() *file.Reader { return fr.rdr }
149
150
151
152 func (fr *FileReader) GetColumn(ctx context.Context, i int) (*ColumnReader, error) {
153 return fr.getColumnReader(ctx, i, fr.allRowGroupFactory())
154 }
155
156 func rowGroupFactory(rowGroups []int) itrFactory {
157 return func(i int, rdr *file.Reader) *columnIterator {
158 return &columnIterator{
159 index: i,
160 rdr: rdr,
161 schema: rdr.MetaData().Schema,
162 rowGroups: rowGroups,
163 }
164 }
165 }
166
167 func (fr *FileReader) allRowGroupFactory() itrFactory {
168 rowGroups := make([]int, fr.rdr.NumRowGroups())
169 for idx := range rowGroups {
170 rowGroups[idx] = idx
171 }
172 return rowGroupFactory(rowGroups)
173 }
174
175
176
177
178
179 func (fr *FileReader) GetFieldReader(ctx context.Context, i int, includedLeaves map[int]bool, rowGroups []int) (*ColumnReader, error) {
180 ctx = context.WithValue(ctx, rdrCtxKey{}, readerCtx{
181 rdr: fr.rdr,
182 mem: fr.mem,
183 colFactory: rowGroupFactory(rowGroups),
184 filterLeaves: true,
185 includedLeaves: includedLeaves,
186 })
187 return fr.getReader(ctx, &fr.Manifest.Fields[i], *fr.Manifest.Fields[i].Field)
188 }
189
190
191
192
193 func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups []int) ([]*ColumnReader, *arrow.Schema, error) {
194 fieldIndices, err := fr.Manifest.GetFieldIndices(colIndices)
195 if err != nil {
196 return nil, nil, err
197 }
198
199 includedLeaves := make(map[int]bool)
200 for _, col := range colIndices {
201 includedLeaves[col] = true
202 }
203
204 out := make([]*ColumnReader, len(fieldIndices))
205 outFields := make([]arrow.Field, len(fieldIndices))
206
207
208
209
210
211
212
213 g, gctx := errgroup.WithContext(ctx)
214 if !fr.Props.Parallel {
215 g.SetLimit(1)
216 }
217 for idx, fidx := range fieldIndices {
218 idx, fidx := idx, fidx
219 g.Go(func() error {
220 rdr, err := fr.GetFieldReader(gctx, fidx, includedLeaves, rowGroups)
221 if err != nil {
222 return err
223 }
224 outFields[idx] = *rdr.Field()
225 out[idx] = rdr
226 return nil
227 })
228 }
229 if err = g.Wait(); err != nil {
230 return nil, nil, err
231 }
232
233 return out, arrow.NewSchema(outFields, fr.Manifest.SchemaMeta), nil
234 }
235
236
237 func (fr *FileReader) RowGroup(idx int) RowGroupReader {
238 return RowGroupReader{fr, idx}
239 }
240
241
242 func (fr *FileReader) ReadColumn(rowGroups []int, rdr *ColumnReader) (*arrow.Chunked, error) {
243 recs := int64(0)
244 for _, rg := range rowGroups {
245 recs += fr.rdr.MetaData().RowGroups[rg].GetNumRows()
246 }
247 return rdr.NextBatch(recs)
248 }
249
250
251 func (fr *FileReader) ReadTable(ctx context.Context) (arrow.Table, error) {
252 var (
253 cols = []int{}
254 rgs = []int{}
255 )
256 for i := 0; i < fr.rdr.MetaData().Schema.NumColumns(); i++ {
257 cols = append(cols, i)
258 }
259 for i := 0; i < fr.rdr.NumRowGroups(); i++ {
260 rgs = append(rgs, i)
261 }
262 return fr.ReadRowGroups(ctx, cols, rgs)
263 }
264
265 func (fr *FileReader) checkCols(indices []int) (err error) {
266 for _, col := range indices {
267 if col < 0 || col >= fr.rdr.MetaData().Schema.NumColumns() {
268 err = fmt.Errorf("invalid column index specified %d out of %d", col, fr.rdr.MetaData().Schema.NumColumns())
269 break
270 }
271 }
272 return
273 }
274
275 func (fr *FileReader) checkRowGroups(indices []int) (err error) {
276 for _, rg := range indices {
277 if rg < 0 || rg >= fr.rdr.NumRowGroups() {
278 err = fmt.Errorf("invalid row group specified: %d, file only has %d row groups", rg, fr.rdr.NumRowGroups())
279 break
280 }
281 }
282 return
283 }
284
285 type readerInfo struct {
286 rdr *ColumnReader
287 idx int
288 }
289
290 type resultPair struct {
291 idx int
292 data *arrow.Chunked
293 err error
294 }
295
296
297
298
299
300 func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []int) (arrow.Table, error) {
301 if err := fr.checkRowGroups(rowGroups); err != nil {
302 return nil, err
303 }
304 if err := fr.checkCols(indices); err != nil {
305 return nil, err
306 }
307
308
309
310 readers, sc, err := fr.GetFieldReaders(ctx, indices, rowGroups)
311 if err != nil {
312 return nil, err
313 }
314
315
316 var (
317 np = 1
318 wg sync.WaitGroup
319 ch = make(chan readerInfo, len(readers))
320 results = make(chan resultPair, 2)
321 )
322
323 if fr.Props.Parallel {
324 np = len(readers)
325 }
326
327 ctx, cancel := context.WithCancel(ctx)
328 defer cancel()
329
330 wg.Add(np)
331 for i := 0; i < np; i++ {
332 go func() {
333 defer wg.Done()
334 for {
335 select {
336 case r, ok := <-ch:
337 if !ok {
338 return
339 }
340
341 chnked, err := fr.ReadColumn(rowGroups, r.rdr)
342
343
344 results <- resultPair{r.idx, chnked, err}
345 case <-ctx.Done():
346 return
347 }
348 }
349 }()
350 }
351
352 go func() {
353 wg.Wait()
354 close(results)
355 }()
356
357
358
359 for idx := range readers {
360 defer readers[idx].Release()
361 ch <- readerInfo{readers[idx], idx}
362 }
363 close(ch)
364
365
366 columns := make([]arrow.Column, sc.NumFields())
367 defer releaseColumns(columns)
368 for data := range results {
369 if data.err != nil {
370 err = data.err
371 cancel()
372 break
373 }
374 columns[data.idx] = *arrow.NewColumn(sc.Field(data.idx), data.data)
375 data.data.Release()
376 }
377
378 if err != nil {
379
380
381
382 for data := range results {
383 data.data.Release()
384 }
385 return nil, err
386 }
387
388 var nrows int
389 if len(columns) > 0 {
390 nrows = columns[0].Len()
391 }
392
393 return array.NewTable(sc, columns, int64(nrows)), nil
394 }
395
396 func (fr *FileReader) getColumnReader(ctx context.Context, i int, colFactory itrFactory) (*ColumnReader, error) {
397 if i < 0 || i >= len(fr.Manifest.Fields) {
398 return nil, fmt.Errorf("invalid column index chosen %d, there are only %d columns", i, len(fr.Manifest.Fields))
399 }
400
401 ctx = context.WithValue(ctx, rdrCtxKey{}, readerCtx{
402 rdr: fr.rdr,
403 mem: fr.mem,
404 colFactory: colFactory,
405 filterLeaves: false,
406 })
407
408 return fr.getReader(ctx, &fr.Manifest.Fields[i], *fr.Manifest.Fields[i].Field)
409 }
410
411
412
413
414 type RecordReader interface {
415 array.RecordReader
416 arrio.Reader
417 }
418
419
420
421
422 func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups []int) (RecordReader, error) {
423 if err := fr.checkRowGroups(rowGroups); err != nil {
424 return nil, err
425 }
426
427 if rowGroups == nil {
428 rowGroups = make([]int, fr.rdr.NumRowGroups())
429 for idx := range rowGroups {
430 rowGroups[idx] = idx
431 }
432 }
433
434 if err := fr.checkCols(colIndices); err != nil {
435 return nil, err
436 }
437
438 if colIndices == nil {
439 colIndices = make([]int, fr.rdr.MetaData().Schema.NumColumns())
440 for idx := range colIndices {
441 colIndices[idx] = idx
442 }
443 }
444
445
446
447 readers, sc, err := fr.GetFieldReaders(ctx, colIndices, rowGroups)
448 if err != nil {
449 return nil, err
450 }
451
452 if len(readers) == 0 {
453 return nil, xerrors.New("no leaf column readers matched col indices")
454 }
455
456 nrows := int64(0)
457 for _, rg := range rowGroups {
458 nrows += fr.rdr.MetaData().RowGroup(rg).NumRows()
459 }
460
461 return &recordReader{
462 numRows: nrows,
463 batchSize: fr.Props.BatchSize,
464 parallel: fr.Props.Parallel,
465 sc: sc,
466 fieldReaders: readers,
467 refCount: 1,
468 }, nil
469 }
470
471 func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowField arrow.Field) (out *ColumnReader, err error) {
472 rctx := readerCtxFromContext(ctx)
473 if len(field.Children) == 0 {
474 if !field.IsLeaf() {
475 return nil, xerrors.New("parquet non-leaf node has no children")
476 }
477 if rctx.filterLeaves && !rctx.includesLeaf(field.ColIndex) {
478 return nil, nil
479 }
480
481 out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props, fr.rdr.BufferPool())
482 return
483 }
484
485 switch arrowField.Type.ID() {
486 case arrow.EXTENSION:
487 return nil, xerrors.New("extension type not implemented")
488 case arrow.STRUCT:
489
490 childReaders := make([]*ColumnReader, len(field.Children))
491 childFields := make([]arrow.Field, len(field.Children))
492
493
494
495
496
497
498 g, gctx := errgroup.WithContext(ctx)
499 if !fr.Props.Parallel {
500 g.SetLimit(1)
501 }
502
503 for n, child := range field.Children {
504 n, child := n, child
505 g.Go(func() error {
506 reader, err := fr.getReader(gctx, &child, *child.Field)
507 if err != nil {
508 return err
509 }
510 if reader == nil {
511 return nil
512 }
513 childFields[n] = *child.Field
514 childReaders[n] = reader
515 return nil
516 })
517 }
518 if err = g.Wait(); err != nil {
519 return nil, err
520 }
521
522
523 for n := len(childReaders) - 1; n >= 0; n-- {
524 if childReaders[n] == nil {
525 childReaders = append(childReaders[:n], childReaders[n+1:]...)
526 childFields = append(childFields[:n], childFields[n+1:]...)
527 }
528 }
529 if len(childFields) == 0 {
530 return nil, nil
531 }
532 filtered := arrow.Field{Name: arrowField.Name, Nullable: arrowField.Nullable,
533 Metadata: arrowField.Metadata, Type: arrow.StructOf(childFields...)}
534 out = newStructReader(&rctx, &filtered, field.LevelInfo, childReaders, fr.Props)
535 case arrow.LIST, arrow.FIXED_SIZE_LIST, arrow.MAP:
536 child := field.Children[0]
537 childReader, err := fr.getReader(ctx, &child, *child.Field)
538 if err != nil {
539 return nil, err
540 }
541 if childReader == nil {
542 return nil, nil
543 }
544 defer childReader.Release()
545
546 switch arrowField.Type.(type) {
547 case *arrow.MapType:
548 if len(child.Children) != 2 {
549 arrowField.Type = arrow.ListOf(childReader.Field().Type)
550 }
551 out = newListReader(&rctx, &arrowField, field.LevelInfo, childReader, fr.Props)
552 case *arrow.ListType:
553 out = newListReader(&rctx, &arrowField, field.LevelInfo, childReader, fr.Props)
554 case *arrow.FixedSizeListType:
555 out = newFixedSizeListReader(&rctx, &arrowField, field.LevelInfo, childReader, fr.Props)
556 default:
557 return nil, fmt.Errorf("unknown list type: %s", field.Field.String())
558 }
559 }
560 return
561 }
562
563
564
565 type RowGroupReader struct {
566 impl *FileReader
567 idx int
568 }
569
570
571 func (rgr RowGroupReader) ReadTable(ctx context.Context, colIndices []int) (arrow.Table, error) {
572 return rgr.impl.ReadRowGroups(ctx, colIndices, []int{rgr.idx})
573 }
574
575
576 func (rgr RowGroupReader) Column(idx int) ColumnChunkReader {
577 return ColumnChunkReader{rgr.impl, idx, rgr.idx}
578 }
579
580
581
582 type ColumnChunkReader struct {
583 impl *FileReader
584 idx int
585 rowGroup int
586 }
587
588 func (ccr ColumnChunkReader) Read(ctx context.Context) (*arrow.Chunked, error) {
589 rdr, err := ccr.impl.getColumnReader(ctx, ccr.idx, rowGroupFactory([]int{ccr.rowGroup}))
590 if err != nil {
591 return nil, err
592 }
593 return ccr.impl.ReadColumn([]int{ccr.rowGroup}, rdr)
594 }
595
596 type columnIterator struct {
597 index int
598 rdr *file.Reader
599 schema *schema.Schema
600 rowGroups []int
601 }
602
603 func (c *columnIterator) NextChunk() (file.PageReader, error) {
604 if len(c.rowGroups) == 0 {
605 return nil, nil
606 }
607
608 rgr := c.rdr.RowGroup(c.rowGroups[0])
609 c.rowGroups = c.rowGroups[1:]
610 return rgr.GetColumnPageReader(c.index)
611 }
612
613 func (c *columnIterator) Descr() *schema.Column { return c.schema.Column(c.index) }
614
615
616
617 type recordReader struct {
618 numRows int64
619 batchSize int64
620 parallel bool
621 sc *arrow.Schema
622 fieldReaders []*ColumnReader
623 cur arrow.Record
624 err error
625
626 refCount int64
627 }
628
629 func (r *recordReader) Retain() {
630 atomic.AddInt64(&r.refCount, 1)
631 }
632
633 func (r *recordReader) Release() {
634 if atomic.AddInt64(&r.refCount, -1) == 0 {
635 if r.cur != nil {
636 r.cur.Release()
637 r.cur = nil
638 }
639 if r.fieldReaders == nil {
640 return
641 }
642 for _, fr := range r.fieldReaders {
643 fr.Release()
644 }
645 r.fieldReaders = nil
646 }
647 }
648
649 func (r *recordReader) Schema() *arrow.Schema { return r.sc }
650
651 func (r *recordReader) next() bool {
652 cols := make([]arrow.Array, len(r.sc.Fields()))
653 defer releaseArrays(cols)
654 readField := func(idx int, rdr *ColumnReader) error {
655 data, err := rdr.NextBatch(r.batchSize)
656 if err != nil {
657 return err
658 }
659 defer data.Release()
660
661 if data.Len() == 0 {
662 return io.EOF
663 }
664
665 arrdata, err := chunksToSingle(data)
666 if err != nil {
667 return err
668 }
669 defer arrdata.Release()
670
671 cols[idx] = array.MakeFromData(arrdata)
672 return nil
673 }
674
675 if !r.parallel {
676 for idx, rdr := range r.fieldReaders {
677 if err := readField(idx, rdr); err != nil {
678 r.err = err
679 return false
680 }
681 }
682
683 r.cur = array.NewRecord(r.sc, cols, -1)
684 return true
685 }
686
687 var (
688 wg sync.WaitGroup
689 np = len(cols)
690 ch = make(chan int, np)
691 errch = make(chan error, np)
692 )
693
694 ctx, cancel := context.WithCancel(context.Background())
695 defer cancel()
696
697 wg.Add(np)
698 for i := 0; i < np; i++ {
699 go func() {
700 defer wg.Done()
701 for {
702 select {
703 case idx, ok := <-ch:
704 if !ok {
705 return
706 }
707
708 if err := readField(idx, r.fieldReaders[idx]); err != nil {
709 errch <- err
710 cancel()
711 return
712 }
713
714 case <-ctx.Done():
715 return
716 }
717 }
718 }()
719 }
720
721 for idx := range r.fieldReaders {
722 ch <- idx
723 }
724 close(ch)
725 wg.Wait()
726 close(errch)
727
728 var ok bool
729
730 if r.err, ok = <-errch; ok {
731
732
733 for range errch {
734 }
735 return false
736 }
737
738 r.cur = array.NewRecord(r.sc, cols, -1)
739 return true
740 }
741
742 func (r *recordReader) Next() bool {
743 if r.cur != nil {
744 r.cur.Release()
745 r.cur = nil
746 }
747
748 if r.err != nil {
749 return false
750 }
751
752 return r.next()
753 }
754
755 func (r *recordReader) Record() arrow.Record { return r.cur }
756
757 func (r *recordReader) Err() error { return r.err }
758
759 func (r *recordReader) Read() (arrow.Record, error) {
760 if r.cur != nil {
761 r.cur.Release()
762 r.cur = nil
763 }
764
765 if !r.next() {
766 return nil, r.err
767 }
768
769 return r.cur, nil
770 }
771
View as plain text