...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package metadata
18
19 import (
20 "fmt"
21 "reflect"
22
23 "github.com/apache/arrow/go/v15/parquet"
24 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
25 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
26 "github.com/apache/arrow/go/v15/parquet/schema"
27 )
28
29
30 type RowGroupMetaData struct {
31 rowGroup *format.RowGroup
32 Schema *schema.Schema
33 version *AppVersion
34 fileDecryptor encryption.FileDecryptor
35 }
36
37
38
39
40 func NewRowGroupMetaData(rg *format.RowGroup, sc *schema.Schema, version *AppVersion, decryptor encryption.FileDecryptor) *RowGroupMetaData {
41 return &RowGroupMetaData{
42 rowGroup: rg,
43 Schema: sc,
44 version: version,
45 fileDecryptor: decryptor,
46 }
47 }
48
49
50 func (r *RowGroupMetaData) NumColumns() int {
51 return len(r.rowGroup.GetColumns())
52 }
53
54 func (r *RowGroupMetaData) Equals(other *RowGroupMetaData) bool {
55 return reflect.DeepEqual(r.rowGroup, other.rowGroup)
56 }
57
58
59
60 func (r *RowGroupMetaData) NumRows() int64 { return r.rowGroup.NumRows }
61
62
63 func (r *RowGroupMetaData) TotalByteSize() int64 { return r.rowGroup.GetTotalByteSize() }
64
65
66 func (r *RowGroupMetaData) FileOffset() int64 { return r.rowGroup.GetFileOffset() }
67
68 func (r *RowGroupMetaData) TotalCompressedSize() int64 { return r.rowGroup.GetTotalCompressedSize() }
69
70
71 func (r *RowGroupMetaData) Ordinal() int16 { return r.rowGroup.GetOrdinal() }
72
73
74 func (r *RowGroupMetaData) ColumnChunk(i int) (*ColumnChunkMetaData, error) {
75 if i >= r.NumColumns() {
76 panic(fmt.Errorf("parquet: the file only has %d columns, requested metadata for column: %d", r.NumColumns(), i))
77 }
78
79 return NewColumnChunkMetaData(r.rowGroup.Columns[i], r.Schema.Column(i), r.version, r.rowGroup.GetOrdinal(), int16(i), r.fileDecryptor)
80 }
81
82
83
84 type RowGroupMetaDataBuilder struct {
85 rg *format.RowGroup
86 props *parquet.WriterProperties
87 schema *schema.Schema
88 colBuilders []*ColumnChunkMetaDataBuilder
89 nextCol int
90 }
91
92
93
94
95
96 func NewRowGroupMetaDataBuilder(props *parquet.WriterProperties, schema *schema.Schema, rg *format.RowGroup) *RowGroupMetaDataBuilder {
97 r := &RowGroupMetaDataBuilder{
98 rg: rg,
99 props: props,
100 schema: schema,
101 colBuilders: make([]*ColumnChunkMetaDataBuilder, 0),
102 }
103 r.rg.Columns = make([]*format.ColumnChunk, schema.NumColumns())
104 return r
105 }
106
107
108 func (r *RowGroupMetaDataBuilder) NumColumns() int {
109 return int(len(r.rg.GetColumns()))
110 }
111
112 func (r *RowGroupMetaDataBuilder) NumRows() int64 {
113 return r.rg.GetNumRows()
114 }
115
116 func (r *RowGroupMetaDataBuilder) SetNumRows(nrows int) {
117 r.rg.NumRows = int64(nrows)
118 }
119
120
121
122
123 func (r *RowGroupMetaDataBuilder) CurrentColumn() int { return r.nextCol - 1 }
124
125
126
127 func (r *RowGroupMetaDataBuilder) NextColumnChunk() *ColumnChunkMetaDataBuilder {
128 if r.nextCol >= r.NumColumns() {
129 panic(fmt.Errorf("parquet: the schema only has %d columns, requested metadata for col: %d", r.NumColumns(), r.nextCol))
130 }
131
132 col := r.schema.Column(r.nextCol)
133 if r.rg.Columns[r.nextCol] == nil {
134 r.rg.Columns[r.nextCol] = &format.ColumnChunk{MetaData: format.NewColumnMetaData()}
135 }
136 colBldr := NewColumnChunkMetaDataBuilderWithContents(r.props, col, r.rg.Columns[r.nextCol])
137 r.nextCol++
138 r.colBuilders = append(r.colBuilders, colBldr)
139 return colBldr
140 }
141
142
143
144
145
146 func (r *RowGroupMetaDataBuilder) Finish(totalBytesWritten int64, ordinal int16) error {
147 if r.nextCol != r.NumColumns() {
148 return fmt.Errorf("parquet: only %d out of %d columns are initialized", r.nextCol-1, r.schema.NumColumns())
149 }
150
151 var (
152 fileOffset int64 = 0
153 totalCompressed int64 = 0
154 )
155
156 for idx, col := range r.rg.Columns {
157 if col.FileOffset < 0 {
158 return fmt.Errorf("parquet: Column %d is not complete", idx)
159 }
160 if idx == 0 {
161 if col.MetaData.IsSetDictionaryPageOffset() && col.MetaData.GetDictionaryPageOffset() > 0 {
162 fileOffset = col.MetaData.GetDictionaryPageOffset()
163 } else {
164 fileOffset = col.MetaData.DataPageOffset
165 }
166 }
167
168
169 totalCompressed += r.colBuilders[idx].TotalCompressedSize()
170 }
171
172 r.rg.FileOffset = &fileOffset
173 r.rg.TotalCompressedSize = &totalCompressed
174 r.rg.TotalByteSize = totalBytesWritten
175 r.rg.Ordinal = &ordinal
176 return nil
177 }
178
View as plain text