1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package main
18
19 import (
20 "bufio"
21 "fmt"
22 "io"
23 "log"
24 "os"
25 "strconv"
26 "strings"
27
28 "github.com/apache/arrow/go/v15/internal/json"
29 "github.com/apache/arrow/go/v15/parquet"
30 "github.com/apache/arrow/go/v15/parquet/file"
31 "github.com/apache/arrow/go/v15/parquet/metadata"
32 "github.com/apache/arrow/go/v15/parquet/schema"
33
34 "github.com/docopt/docopt-go"
35 )
36
37 var version = ""
38 var usage = `Parquet Reader (version ` + version + `)
39 Usage:
40 parquet_reader -h | --help
41 parquet_reader [--only-metadata] [--no-metadata] [--no-memory-map] [--json] [--csv] [--output=FILE]
42 [--print-key-value-metadata] [--int96-timestamp] [--columns=COLUMNS] <file>
43 Options:
44 -h --help Show this screen.
45 --print-key-value-metadata Print out the key-value metadata. [default: false]
46 --only-metadata Stop after printing metadata, no values.
47 --no-metadata Do not print metadata.
48 --output=FILE Specify output file for data. [default: -]
49 --no-memory-map Disable memory mapping the file.
50 --int96-timestamp Parse INT96 as TIMESTAMP for legacy support.
51 --json Format output as JSON instead of text.
52 --csv Format output as CSV instead of text.
53 --columns=COLUMNS Specify a subset of columns to print, comma delimited indexes.`
54
55 func main() {
56 opts, _ := docopt.ParseDoc(usage)
57 var config struct {
58 PrintKeyValueMetadata bool
59 OnlyMetadata bool
60 NoMetadata bool
61 Output string
62 NoMemoryMap bool
63 JSON bool `docopt:"--json"`
64 CSV bool `docopt:"--csv"`
65 ParseInt96AsTimestamp bool `docopt:"--int96-timestamp"`
66 Columns string
67 File string
68 }
69 opts.Bind(&config)
70
71 parseInt96AsTimestamp = config.ParseInt96AsTimestamp
72
73 var dataOut io.Writer
74 dataOut = os.Stdout
75 if config.Output != "-" {
76 var err error
77 fileOut, err := os.Create(config.Output)
78 if err != nil {
79 fmt.Fprintf(os.Stderr, "error: --output %q cannot be created, %s\n", config.Output, err)
80 os.Exit(1)
81 }
82 bufOut := bufio.NewWriter(fileOut)
83 defer func() {
84 bufOut.Flush()
85 fileOut.Close()
86 }()
87 dataOut = bufOut
88 }
89
90 if config.CSV && config.JSON {
91 fmt.Fprintln(os.Stderr, "error: both --json and --csv outputs selected.")
92 os.Exit(1)
93 }
94
95 selectedColumns := []int{}
96 if config.Columns != "" {
97 for _, c := range strings.Split(config.Columns, ",") {
98 cval, err := strconv.Atoi(c)
99 if err != nil {
100 fmt.Fprintln(os.Stderr, "error: --columns needs to be comma-delimited integers")
101 os.Exit(1)
102 }
103 selectedColumns = append(selectedColumns, cval)
104 }
105 }
106
107 rdr, err := file.OpenParquetFile(config.File, !config.NoMemoryMap)
108 if err != nil {
109 fmt.Fprintln(os.Stderr, "error opening parquet file: ", err)
110 os.Exit(1)
111 }
112
113 fileMetadata := rdr.MetaData()
114
115 if !config.NoMetadata {
116 fmt.Println("File name:", config.File)
117 fmt.Println("Version:", fileMetadata.Version())
118 fmt.Println("Created By:", fileMetadata.GetCreatedBy())
119 fmt.Println("Num Rows:", rdr.NumRows())
120
121 keyvaluemeta := fileMetadata.KeyValueMetadata()
122 if config.PrintKeyValueMetadata && keyvaluemeta != nil {
123 fmt.Println("Key Value File Metadata:", keyvaluemeta.Len(), "entries")
124 keys := keyvaluemeta.Keys()
125 values := keyvaluemeta.Values()
126 for i := 0; i < keyvaluemeta.Len(); i++ {
127 fmt.Printf("Key nr %d %s: %s\n", i, keys[i], values[i])
128 }
129 }
130
131 fmt.Println("Number of RowGroups:", rdr.NumRowGroups())
132 fmt.Println("Number of Real Columns:", fileMetadata.Schema.Root().NumFields())
133 fmt.Println("Number of Columns:", fileMetadata.Schema.NumColumns())
134 }
135
136 if len(selectedColumns) == 0 {
137 for i := 0; i < fileMetadata.Schema.NumColumns(); i++ {
138 selectedColumns = append(selectedColumns, i)
139 }
140 } else {
141 for _, c := range selectedColumns {
142 if c < 0 || c >= fileMetadata.Schema.NumColumns() {
143 fmt.Fprintln(os.Stderr, "selected column is out of range")
144 os.Exit(1)
145 }
146 }
147 }
148
149 if !config.NoMetadata {
150 fmt.Println("Number of Selected Columns:", len(selectedColumns))
151 for _, c := range selectedColumns {
152 descr := fileMetadata.Schema.Column(c)
153 fmt.Printf("Column %d: %s (%s", c, descr.Path(), descr.PhysicalType())
154 if descr.ConvertedType() != schema.ConvertedTypes.None {
155 fmt.Printf("/%s", descr.ConvertedType())
156 if descr.ConvertedType() == schema.ConvertedTypes.Decimal {
157 dec := descr.LogicalType().(*schema.DecimalLogicalType)
158 fmt.Printf("(%d,%d)", dec.Precision(), dec.Scale())
159 }
160 }
161 fmt.Print(")\n")
162 }
163 }
164
165 for r := 0; r < rdr.NumRowGroups(); r++ {
166 if !config.NoMetadata {
167 fmt.Println("--- Row Group:", r, " ---")
168 }
169
170 rgr := rdr.RowGroup(r)
171 rowGroupMeta := rgr.MetaData()
172 if !config.NoMetadata {
173 fmt.Println("--- Total Bytes:", rowGroupMeta.TotalByteSize(), " ---")
174 fmt.Println("--- Rows:", rgr.NumRows(), " ---")
175 }
176
177 for _, c := range selectedColumns {
178 chunkMeta, err := rowGroupMeta.ColumnChunk(c)
179 if err != nil {
180 log.Fatal(err)
181 }
182
183 if !config.NoMetadata {
184 fmt.Println("Column", c)
185 if set, _ := chunkMeta.StatsSet(); set {
186 stats, err := chunkMeta.Statistics()
187 if err != nil {
188 log.Fatal(err)
189 }
190 fmt.Printf(" Values: %d", chunkMeta.NumValues())
191 if stats.HasMinMax() {
192 fmt.Printf(", Min: %v, Max: %v",
193 metadata.GetStatValue(stats.Type(), stats.EncodeMin()),
194 metadata.GetStatValue(stats.Type(), stats.EncodeMax()))
195 }
196 if stats.HasNullCount() {
197 fmt.Printf(", Null Values: %d", stats.NullCount())
198 }
199 if stats.HasDistinctCount() {
200 fmt.Printf(", Distinct Values: %d", stats.DistinctCount())
201 }
202 fmt.Println()
203 } else {
204 fmt.Println(" Values:", chunkMeta.NumValues(), "Statistics Not Set")
205 }
206
207 fmt.Print(" Compression: ", chunkMeta.Compression())
208 fmt.Print(", Encodings:")
209 for _, enc := range chunkMeta.Encodings() {
210 fmt.Print(" ", enc)
211 }
212 fmt.Println()
213
214 fmt.Print(" Uncompressed Size: ", chunkMeta.TotalUncompressedSize())
215 fmt.Println(", Compressed Size:", chunkMeta.TotalCompressedSize())
216 }
217 }
218
219 if config.OnlyMetadata {
220 continue
221 }
222
223 if !config.NoMetadata {
224 fmt.Println("--- Values ---")
225 }
226
227 switch {
228 case config.JSON:
229 fmt.Fprint(dataOut, "[")
230
231 scanners := make([]*Dumper, len(selectedColumns))
232 fields := make([]string, len(selectedColumns))
233 for idx, c := range selectedColumns {
234 col, err := rgr.Column(c)
235 if err != nil {
236 log.Fatalf("unable to fetch column=%d err=%s", c, err)
237 }
238 scanners[idx] = createDumper(col)
239 fields[idx] = col.Descriptor().Path()
240 }
241
242 var line string
243 for {
244 if line == "" {
245 line = "\n {"
246 } else {
247 line = ",\n {"
248 }
249
250 data := false
251 first := true
252 for idx, s := range scanners {
253 if val, ok := s.Next(); ok {
254 if !data {
255 fmt.Fprint(dataOut, line)
256 }
257 data = true
258 if val == nil {
259 continue
260 }
261 if !first {
262 fmt.Fprint(dataOut, ",")
263 }
264 first = false
265 switch val.(type) {
266 case bool, int32, int64, float32, float64:
267 default:
268 val = s.FormatValue(val, 0)
269 }
270 jsonVal, err := json.Marshal(val)
271 if err != nil {
272 fmt.Fprintf(os.Stderr, "error: marshalling json for %+v, %s\n", val, err)
273 os.Exit(1)
274 }
275 fmt.Fprintf(dataOut, "\n %q: %s", fields[idx], jsonVal)
276 }
277 }
278 if !data {
279 break
280 }
281 fmt.Fprint(dataOut, "\n }")
282 }
283
284 fmt.Fprintln(dataOut, "\n]")
285 case config.CSV:
286 scanners := make([]*Dumper, len(selectedColumns))
287 for idx, c := range selectedColumns {
288 if idx > 0 {
289 fmt.Fprint(dataOut, ",")
290 }
291 col, err := rgr.Column(c)
292 if err != nil {
293 log.Fatalf("unable to fetch col=%d err=%s", c, err)
294 }
295 scanners[idx] = createDumper(col)
296 fmt.Fprintf(dataOut, "%q", col.Descriptor().Path())
297 }
298 fmt.Fprintln(dataOut)
299
300 var line string
301 for {
302 data := false
303 for idx, s := range scanners {
304 if idx > 0 {
305 if data {
306 fmt.Fprint(dataOut, ",")
307 } else {
308 line += ","
309 }
310 }
311 if val, ok := s.Next(); ok {
312 if !data {
313 fmt.Fprint(dataOut, line)
314 }
315 data = true
316 if val == nil {
317 fmt.Fprint(dataOut, "")
318 continue
319 }
320 switch val.(type) {
321 case bool, int32, int64, parquet.Int96, float32, float64:
322 fmt.Fprintf(dataOut, "%v", val)
323 default:
324 fmt.Fprintf(dataOut, "%q", s.FormatValue(val, 0))
325 }
326 } else {
327 if data {
328 fmt.Fprint(dataOut, ",")
329 } else {
330 line += ","
331 }
332 }
333 }
334 if !data {
335 break
336 }
337 fmt.Fprintln(dataOut)
338 line = ""
339 }
340 fmt.Fprintln(dataOut)
341 default:
342 const colwidth = 18
343
344 scanners := make([]*Dumper, len(selectedColumns))
345 for idx, c := range selectedColumns {
346 col, err := rgr.Column(c)
347 if err != nil {
348 log.Fatalf("unable to fetch column=%d err=%s", c, err)
349 }
350 scanners[idx] = createDumper(col)
351 fmt.Fprintf(dataOut, fmt.Sprintf("%%-%ds|", colwidth), col.Descriptor().Name())
352 }
353 fmt.Fprintln(dataOut)
354
355 var line string
356 for {
357 data := false
358 for _, s := range scanners {
359 if val, ok := s.Next(); ok {
360 if !data {
361 fmt.Fprint(dataOut, line)
362 }
363 fmt.Fprint(dataOut, s.FormatValue(val, colwidth), "|")
364 data = true
365 } else {
366 if data {
367 fmt.Fprintf(dataOut, fmt.Sprintf("%%-%ds|", colwidth), "")
368 } else {
369 line += fmt.Sprintf(fmt.Sprintf("%%-%ds|", colwidth), "")
370 }
371 }
372 }
373 if !data {
374 break
375 }
376 fmt.Fprintln(dataOut)
377 line = ""
378 }
379 fmt.Fprintln(dataOut)
380 }
381 }
382 }
383
View as plain text