1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package main
18
19 import (
20 "encoding/binary"
21 "fmt"
22 "reflect"
23 "time"
24
25 "github.com/apache/arrow/go/v15/parquet"
26 "github.com/apache/arrow/go/v15/parquet/file"
27 "github.com/apache/arrow/go/v15/parquet/schema"
28 )
29
30 const defaultBatchSize = 128
31
32 type Dumper struct {
33 reader file.ColumnChunkReader
34 batchSize int64
35 valueOffset int
36 valuesBuffered int
37
38 levelOffset int64
39 levelsBuffered int64
40 defLevels []int16
41 repLevels []int16
42
43 valueBuffer interface{}
44 }
45
46 func createDumper(reader file.ColumnChunkReader) *Dumper {
47 batchSize := defaultBatchSize
48
49 var valueBuffer interface{}
50 switch reader.(type) {
51 case *file.BooleanColumnChunkReader:
52 valueBuffer = make([]bool, batchSize)
53 case *file.Int32ColumnChunkReader:
54 valueBuffer = make([]int32, batchSize)
55 case *file.Int64ColumnChunkReader:
56 valueBuffer = make([]int64, batchSize)
57 case *file.Float32ColumnChunkReader:
58 valueBuffer = make([]float32, batchSize)
59 case *file.Float64ColumnChunkReader:
60 valueBuffer = make([]float64, batchSize)
61 case *file.Int96ColumnChunkReader:
62 valueBuffer = make([]parquet.Int96, batchSize)
63 case *file.ByteArrayColumnChunkReader:
64 valueBuffer = make([]parquet.ByteArray, batchSize)
65 case *file.FixedLenByteArrayColumnChunkReader:
66 valueBuffer = make([]parquet.FixedLenByteArray, batchSize)
67 }
68
69 return &Dumper{
70 reader: reader,
71 batchSize: int64(batchSize),
72 defLevels: make([]int16, batchSize),
73 repLevels: make([]int16, batchSize),
74 valueBuffer: valueBuffer,
75 }
76 }
77
78 func (dump *Dumper) readNextBatch() {
79 switch reader := dump.reader.(type) {
80 case *file.BooleanColumnChunkReader:
81 values := dump.valueBuffer.([]bool)
82 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
83 case *file.Int32ColumnChunkReader:
84 values := dump.valueBuffer.([]int32)
85 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
86 case *file.Int64ColumnChunkReader:
87 values := dump.valueBuffer.([]int64)
88 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
89 case *file.Float32ColumnChunkReader:
90 values := dump.valueBuffer.([]float32)
91 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
92 case *file.Float64ColumnChunkReader:
93 values := dump.valueBuffer.([]float64)
94 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
95 case *file.Int96ColumnChunkReader:
96 values := dump.valueBuffer.([]parquet.Int96)
97 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
98 case *file.ByteArrayColumnChunkReader:
99 values := dump.valueBuffer.([]parquet.ByteArray)
100 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
101 case *file.FixedLenByteArrayColumnChunkReader:
102 values := dump.valueBuffer.([]parquet.FixedLenByteArray)
103 dump.levelsBuffered, dump.valuesBuffered, _ = reader.ReadBatch(dump.batchSize, values, dump.defLevels, dump.repLevels)
104 }
105
106 dump.valueOffset = 0
107 dump.levelOffset = 0
108 }
109
110 func (dump *Dumper) hasNext() bool {
111 return dump.levelOffset < dump.levelsBuffered || dump.reader.HasNext()
112 }
113
114 const microSecondsPerDay = 24 * 3600e6
115
116 var parseInt96AsTimestamp = false
117
118 func (dump *Dumper) FormatValue(val interface{}, width int) string {
119 fmtstring := fmt.Sprintf("-%d", width)
120 switch val := val.(type) {
121 case nil:
122 return fmt.Sprintf("%"+fmtstring+"s", "NULL")
123 case bool:
124 return fmt.Sprintf("%"+fmtstring+"t", val)
125 case int32:
126 return fmt.Sprintf("%"+fmtstring+"d", val)
127 case int64:
128 return fmt.Sprintf("%"+fmtstring+"d", val)
129 case float32:
130 return fmt.Sprintf("%"+fmtstring+"f", val)
131 case float64:
132 return fmt.Sprintf("%"+fmtstring+"f", val)
133 case parquet.Int96:
134 if parseInt96AsTimestamp {
135 usec := int64(binary.LittleEndian.Uint64(val[:8])/1000) +
136 (int64(binary.LittleEndian.Uint32(val[8:]))-2440588)*microSecondsPerDay
137 t := time.Unix(usec/1e6, (usec%1e6)*1e3).UTC()
138 return fmt.Sprintf("%"+fmtstring+"s", t)
139 } else {
140 return fmt.Sprintf("%"+fmtstring+"s",
141 fmt.Sprintf("%d %d %d",
142 binary.LittleEndian.Uint32(val[:4]),
143 binary.LittleEndian.Uint32(val[4:]),
144 binary.LittleEndian.Uint32(val[8:])))
145 }
146 case parquet.ByteArray:
147 if dump.reader.Descriptor().ConvertedType() == schema.ConvertedTypes.UTF8 {
148 return fmt.Sprintf("%"+fmtstring+"s", string(val))
149 }
150 return fmt.Sprintf("% "+fmtstring+"X", val)
151 case parquet.FixedLenByteArray:
152 return fmt.Sprintf("% "+fmtstring+"X", val)
153 default:
154 return fmt.Sprintf("%"+fmtstring+"s", fmt.Sprintf("%v", val))
155 }
156 }
157
158 func (dump *Dumper) Next() (interface{}, bool) {
159 if dump.levelOffset == dump.levelsBuffered {
160 if !dump.hasNext() {
161 return nil, false
162 }
163 dump.readNextBatch()
164 if dump.levelsBuffered == 0 {
165 return nil, false
166 }
167 }
168
169 defLevel := dump.defLevels[int(dump.levelOffset)]
170
171 dump.levelOffset++
172
173 if defLevel < dump.reader.Descriptor().MaxDefinitionLevel() {
174 return nil, true
175 }
176
177 vb := reflect.ValueOf(dump.valueBuffer)
178 v := vb.Index(dump.valueOffset).Interface()
179 dump.valueOffset++
180
181 return v, true
182 }
183
View as plain text