1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pqarrow
18
19 import (
20 "context"
21 "encoding/base64"
22 "fmt"
23 "io"
24
25 "github.com/apache/arrow/go/v15/arrow"
26 "github.com/apache/arrow/go/v15/arrow/flight"
27 "github.com/apache/arrow/go/v15/internal/utils"
28 "github.com/apache/arrow/go/v15/parquet"
29 "github.com/apache/arrow/go/v15/parquet/file"
30 "github.com/apache/arrow/go/v15/parquet/metadata"
31 "golang.org/x/xerrors"
32 )
33
34
35
36
37 func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props *parquet.WriterProperties, arrprops ArrowWriterProperties) error {
38 writer, err := NewFileWriter(tbl.Schema(), w, props, arrprops)
39 if err != nil {
40 return err
41 }
42
43 if err := writer.WriteTable(tbl, chunkSize); err != nil {
44 return err
45 }
46
47 return writer.Close()
48 }
49
50
51 type FileWriter struct {
52 wr *file.Writer
53 schema *arrow.Schema
54 manifest *SchemaManifest
55 rgw file.RowGroupWriter
56 arrowProps ArrowWriterProperties
57 ctx context.Context
58 colIdx int
59 closed bool
60 }
61
62
63
64
65 func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) {
66 if props == nil {
67 props = parquet.NewWriterProperties()
68 }
69
70 pqschema, err := ToParquet(arrschema, props, arrprops)
71 if err != nil {
72 return nil, err
73 }
74
75 meta := make(metadata.KeyValueMetadata, 0)
76 for i := 0; i < arrschema.Metadata().Len(); i++ {
77 meta.Append(arrschema.Metadata().Keys()[i], arrschema.Metadata().Values()[i])
78 }
79
80 if arrprops.storeSchema {
81 serializedSchema := flight.SerializeSchema(arrschema, props.Allocator())
82 meta.Append("ARROW:schema", base64.StdEncoding.EncodeToString(serializedSchema))
83 }
84
85 schemaNode := pqschema.Root()
86 baseWriter := file.NewParquetWriter(w, schemaNode, file.WithWriterProps(props), file.WithWriteMetadata(meta))
87
88 manifest, err := NewSchemaManifest(pqschema, nil, &ArrowReadProperties{})
89 if err != nil {
90 return nil, err
91 }
92
93 return &FileWriter{wr: baseWriter, schema: arrschema, manifest: manifest, arrowProps: arrprops, ctx: NewArrowWriteContext(context.TODO(), &arrprops)}, nil
94 }
95
96
97
98 func (fw *FileWriter) NewRowGroup() {
99 if fw.rgw != nil {
100 fw.rgw.Close()
101 }
102 fw.rgw = fw.wr.AppendRowGroup()
103 fw.colIdx = 0
104 }
105
106
107
108
109
110
111 func (fw *FileWriter) NewBufferedRowGroup() {
112 if fw.rgw != nil {
113 fw.rgw.Close()
114 }
115 fw.rgw = fw.wr.AppendBufferedRowGroup()
116 fw.colIdx = 0
117 }
118
119
120
121 func (fw *FileWriter) RowGroupTotalCompressedBytes() int64 {
122 if fw.rgw != nil {
123 return fw.rgw.TotalCompressedBytes()
124 }
125 return 0
126 }
127
128
129
130 func (fw *FileWriter) RowGroupTotalBytesWritten() int64 {
131 if fw.rgw != nil {
132 return fw.rgw.TotalBytesWritten()
133 }
134 return 0
135 }
136
137
138
139 func (fw *FileWriter) RowGroupNumRows() (int, error) {
140 if fw.rgw != nil {
141 return fw.rgw.NumRows()
142 }
143 return 0, nil
144 }
145
146
147 func (fw *FileWriter) NumRows() int {
148 if fw.wr != nil {
149 return fw.wr.NumRows()
150 }
151 return 0
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 func (fw *FileWriter) WriteBuffered(rec arrow.Record) error {
168 if !rec.Schema().Equal(fw.schema) {
169 return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema)
170 }
171
172 var (
173 recList []arrow.Record
174 maxRows = fw.wr.Properties().MaxRowGroupLength()
175 curRows int
176 err error
177 )
178 if fw.rgw != nil {
179 if curRows, err = fw.rgw.NumRows(); err != nil {
180 return err
181 }
182 } else {
183 fw.NewBufferedRowGroup()
184 }
185
186 if int64(curRows)+rec.NumRows() <= maxRows {
187 recList = []arrow.Record{rec}
188 } else {
189 recList = []arrow.Record{rec.NewSlice(0, maxRows-int64(curRows))}
190 defer recList[0].Release()
191 for offset := maxRows - int64(curRows); offset < rec.NumRows(); offset += maxRows {
192 s := rec.NewSlice(offset, offset+utils.Min(maxRows, rec.NumRows()-offset))
193 defer s.Release()
194 recList = append(recList, s)
195 }
196 }
197
198 for idx, r := range recList {
199 if idx > 0 {
200 fw.NewBufferedRowGroup()
201 }
202 for i := 0; i < int(r.NumCols()); i++ {
203 if err := fw.WriteColumnData(r.Column(i)); err != nil {
204 fw.Close()
205 return err
206 }
207 }
208 }
209 fw.colIdx = 0
210 return nil
211 }
212
213
214
215
216
217
218
219
220
221 func (fw *FileWriter) Write(rec arrow.Record) error {
222 if !rec.Schema().Equal(fw.schema) {
223 return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema)
224 }
225
226 var recList []arrow.Record
227 rowgroupLen := fw.wr.Properties().MaxRowGroupLength()
228 if rec.NumRows() > rowgroupLen {
229 recList = make([]arrow.Record, 0)
230 for offset := int64(0); offset < rec.NumRows(); offset += rowgroupLen {
231 s := rec.NewSlice(offset, offset+utils.Min(rowgroupLen, rec.NumRows()-offset))
232 defer s.Release()
233 recList = append(recList, s)
234 }
235 } else {
236 recList = []arrow.Record{rec}
237 }
238
239 for _, r := range recList {
240 fw.NewRowGroup()
241 for i := 0; i < int(r.NumCols()); i++ {
242 if err := fw.WriteColumnData(r.Column(i)); err != nil {
243 fw.Close()
244 return err
245 }
246 }
247 }
248 fw.colIdx = 0
249 return nil
250 }
251
252
253
254
255
256 func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error {
257 if chunkSize <= 0 && tbl.NumRows() > 0 {
258 return xerrors.New("chunk size per row group must be greater than 0")
259 } else if !tbl.Schema().Equal(fw.schema) {
260 return fmt.Errorf("table schema does not match writer's. \nTable: %s\n writer: %s", tbl.Schema(), fw.schema)
261 } else if chunkSize > fw.wr.Properties().MaxRowGroupLength() {
262 chunkSize = fw.wr.Properties().MaxRowGroupLength()
263 }
264
265 writeRowGroup := func(offset, size int64) error {
266 fw.NewRowGroup()
267 for i := 0; i < int(tbl.NumCols()); i++ {
268 if err := fw.WriteColumnChunked(tbl.Column(i).Data(), offset, size); err != nil {
269 return err
270 }
271 }
272 return nil
273 }
274
275 if tbl.NumRows() == 0 {
276 if err := writeRowGroup(0, 0); err != nil {
277 fw.Close()
278 return err
279 }
280 return nil
281 }
282
283 for offset := int64(0); offset < tbl.NumRows(); offset += chunkSize {
284 if err := writeRowGroup(offset, utils.Min(chunkSize, tbl.NumRows()-offset)); err != nil {
285 fw.Close()
286 return err
287 }
288 }
289 return nil
290 }
291
292
293 func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error {
294 return fw.wr.AppendKeyValueMetadata(key, value)
295 }
296
297
298
299 func (fw *FileWriter) Close() error {
300 if !fw.closed {
301 fw.closed = true
302 if fw.rgw != nil {
303 if err := fw.rgw.Close(); err != nil {
304 return err
305 }
306 }
307
308 writeCtx := arrowCtxFromContext(fw.ctx)
309 if writeCtx.dataBuffer != nil {
310 writeCtx.dataBuffer.Release()
311 writeCtx.dataBuffer = nil
312 }
313
314 return fw.wr.Close()
315 }
316 return nil
317 }
318
319
320
321
322
323
324 func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error {
325 acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
326 if err != nil {
327 return err
328 }
329 fw.colIdx += acw.leafCount
330 return acw.Write(fw.ctx)
331 }
332
333
334
335
336 func (fw *FileWriter) WriteColumnData(data arrow.Array) error {
337 chunked := arrow.NewChunked(data.DataType(), []arrow.Array{data})
338 defer chunked.Release()
339 return fw.WriteColumnChunked(chunked, 0, int64(data.Len()))
340 }
341
View as plain text