1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "github.com/apache/arrow/go/v15/parquet"
21 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
22 "github.com/apache/arrow/go/v15/parquet/internal/utils"
23 "github.com/apache/arrow/go/v15/parquet/metadata"
24 "golang.org/x/xerrors"
25 )
26
27
28
29 type RowGroupWriter interface {
30
31 NumColumns() int
32
33
34 NumRows() (int, error)
35
36 TotalCompressedBytes() int64
37
38 TotalBytesWritten() int64
39
40
41
42 Close() error
43
44
45 Buffered() bool
46 }
47
48
49
50
51 type SerialRowGroupWriter interface {
52 RowGroupWriter
53 NextColumn() (ColumnChunkWriter, error)
54
55
56 CurrentColumn() int
57 }
58
59
60
61
62
63 type BufferedRowGroupWriter interface {
64 RowGroupWriter
65 Column(i int) (ColumnChunkWriter, error)
66 }
67
68 type rowGroupWriter struct {
69 sink utils.WriterTell
70 metadata *metadata.RowGroupMetaDataBuilder
71 props *parquet.WriterProperties
72 bytesWritten int64
73 closed bool
74 ordinal int16
75 nextColumnIdx int
76 nrows int
77 buffered bool
78 fileEncryptor encryption.FileEncryptor
79
80 columnWriters []ColumnChunkWriter
81 pager PageWriter
82 }
83
84 func newRowGroupWriter(sink utils.WriterTell, metadata *metadata.RowGroupMetaDataBuilder, ordinal int16, props *parquet.WriterProperties, buffered bool, fileEncryptor encryption.FileEncryptor) *rowGroupWriter {
85 ret := &rowGroupWriter{
86 sink: sink,
87 metadata: metadata,
88 props: props,
89 ordinal: ordinal,
90 buffered: buffered,
91 fileEncryptor: fileEncryptor,
92 }
93 if buffered {
94 ret.initColumns()
95 } else {
96 ret.columnWriters = []ColumnChunkWriter{nil}
97 }
98 return ret
99 }
100
101 func (rg *rowGroupWriter) Buffered() bool { return rg.buffered }
102
103 func (rg *rowGroupWriter) checkRowsWritten() error {
104 if len(rg.columnWriters) == 0 {
105 return nil
106 }
107
108 if !rg.buffered && rg.columnWriters[0] != nil {
109 current := rg.columnWriters[0].RowsWritten()
110 if rg.nrows == 0 {
111 rg.nrows = current
112 } else if rg.nrows != current {
113 return xerrors.Errorf("row mismatch for unbuffered row group: %d, count expected: %d, actual: %d", rg.ordinal, current, rg.nrows)
114 }
115 } else if rg.buffered {
116 current := rg.columnWriters[0].RowsWritten()
117 for i, wr := range rg.columnWriters[1:] {
118 if current != wr.RowsWritten() {
119 return xerrors.Errorf("row mismatch for buffered row group: %d, column: %d, count expected: %d, actual: %d", rg.ordinal, i+1, current, wr.RowsWritten())
120 }
121 }
122 rg.nrows = current
123 }
124 return nil
125 }
126
127 func (rg *rowGroupWriter) NumColumns() int { return rg.metadata.NumColumns() }
128 func (rg *rowGroupWriter) NumRows() (int, error) {
129 err := rg.checkRowsWritten()
130 return rg.nrows, err
131 }
132
133 func (rg *rowGroupWriter) NextColumn() (ColumnChunkWriter, error) {
134 if rg.buffered {
135 panic("next column is not supported when a rowgroup is written by size")
136 }
137 if rg.columnWriters[0] != nil {
138 if err := rg.checkRowsWritten(); err != nil {
139 return nil, err
140 }
141 }
142
143
144 colMeta := rg.metadata.NextColumnChunk()
145 if rg.columnWriters[0] != nil {
146 if err := rg.columnWriters[0].Close(); err != nil {
147 return nil, err
148 }
149 rg.bytesWritten += rg.columnWriters[0].TotalBytesWritten()
150 }
151 rg.nextColumnIdx++
152
153 path := colMeta.Descr().Path()
154 var (
155 metaEncryptor encryption.Encryptor
156 dataEncryptor encryption.Encryptor
157 )
158 if rg.fileEncryptor != nil {
159 metaEncryptor = rg.fileEncryptor.GetColumnMetaEncryptor(path)
160 dataEncryptor = rg.fileEncryptor.GetColumnDataEncryptor(path)
161 }
162
163 if rg.pager == nil {
164 var err error
165 rg.pager, err = NewPageWriter(rg.sink, rg.props.CompressionFor(path), rg.props.CompressionLevelFor(path), colMeta, rg.ordinal, int16(rg.nextColumnIdx-1), rg.props.Allocator(), false, metaEncryptor, dataEncryptor)
166 if err != nil {
167 return nil, err
168 }
169 } else {
170 rg.pager.Reset(rg.sink, rg.props.CompressionFor(path), rg.props.CompressionLevelFor(path), colMeta, rg.ordinal, int16(rg.nextColumnIdx-1), metaEncryptor, dataEncryptor)
171 }
172
173 rg.columnWriters[0] = NewColumnChunkWriter(colMeta, rg.pager, rg.props)
174 return rg.columnWriters[0], nil
175 }
176
177 func (rg *rowGroupWriter) Column(i int) (ColumnChunkWriter, error) {
178 if !rg.buffered {
179 panic("column is only supported when a bufferedrowgroup is being written")
180 }
181
182 if i >= 0 && i < len(rg.columnWriters) {
183 return rg.columnWriters[i], nil
184 }
185 return nil, xerrors.Errorf("invalid column number requested: %d", i)
186 }
187
188 func (rg *rowGroupWriter) CurrentColumn() int { return rg.metadata.CurrentColumn() }
189 func (rg *rowGroupWriter) TotalCompressedBytes() int64 {
190 total := int64(0)
191 for _, wr := range rg.columnWriters {
192 if wr != nil {
193 total += wr.TotalCompressedBytes()
194 }
195 }
196 return total
197 }
198
199 func (rg *rowGroupWriter) TotalBytesWritten() int64 {
200 total := int64(0)
201 for _, wr := range rg.columnWriters {
202 if wr != nil {
203 total += wr.TotalBytesWritten()
204 }
205 }
206 return total + rg.bytesWritten
207 }
208
209 func (rg *rowGroupWriter) Close() error {
210 if !rg.closed {
211 rg.closed = true
212 if err := rg.checkRowsWritten(); err != nil {
213 return err
214 }
215
216 for _, wr := range rg.columnWriters {
217 if wr != nil {
218 if err := wr.Close(); err != nil {
219 return err
220 }
221 rg.bytesWritten += wr.TotalBytesWritten()
222 }
223 }
224
225 rg.columnWriters = nil
226 rg.metadata.SetNumRows(rg.nrows)
227 rg.metadata.Finish(rg.bytesWritten, rg.ordinal)
228 }
229 return nil
230 }
231
232 func (rg *rowGroupWriter) initColumns() error {
233 if rg.columnWriters == nil {
234 rg.columnWriters = make([]ColumnChunkWriter, 0, rg.NumColumns())
235 }
236 for i := 0; i < rg.NumColumns(); i++ {
237 colMeta := rg.metadata.NextColumnChunk()
238 path := colMeta.Descr().Path()
239 var (
240 metaEncryptor encryption.Encryptor
241 dataEncryptor encryption.Encryptor
242 )
243 if rg.fileEncryptor != nil {
244 metaEncryptor = rg.fileEncryptor.GetColumnMetaEncryptor(path)
245 dataEncryptor = rg.fileEncryptor.GetColumnDataEncryptor(path)
246 }
247 pager, err := NewPageWriter(rg.sink, rg.props.CompressionFor(path), rg.props.CompressionLevelFor(path), colMeta, rg.ordinal, int16(rg.nextColumnIdx), rg.props.Allocator(), rg.buffered, metaEncryptor, dataEncryptor)
248 if err != nil {
249 return err
250 }
251 rg.nextColumnIdx++
252 rg.columnWriters = append(rg.columnWriters, NewColumnChunkWriter(colMeta, pager, rg.props))
253 }
254 return nil
255 }
256
View as plain text