1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "encoding/binary"
21 "fmt"
22 "io"
23
24 "github.com/apache/arrow/go/v15/parquet"
25 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
26 "github.com/apache/arrow/go/v15/parquet/internal/utils"
27 "github.com/apache/arrow/go/v15/parquet/metadata"
28 "github.com/apache/arrow/go/v15/parquet/schema"
29 )
30
31
32 type Writer struct {
33 sink utils.WriteCloserTell
34 open bool
35 props *parquet.WriterProperties
36 rowGroups int
37 nrows int
38 metadata metadata.FileMetaDataBuilder
39 fileEncryptor encryption.FileEncryptor
40 rowGroupWriter *rowGroupWriter
41
42
43 Schema *schema.Schema
44 }
45
46 type writerConfig struct {
47 props *parquet.WriterProperties
48 keyValueMetadata metadata.KeyValueMetadata
49 }
50
51 type WriteOption func(*writerConfig)
52
53 func WithWriterProps(props *parquet.WriterProperties) WriteOption {
54 return func(c *writerConfig) {
55 c.props = props
56 }
57 }
58
59 func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption {
60 return func(c *writerConfig) {
61 c.keyValueMetadata = meta
62 }
63 }
64
65
66
67
68
69 func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *Writer {
70 config := &writerConfig{}
71 for _, o := range opts {
72 o(config)
73 }
74 if config.props == nil {
75 config.props = parquet.NewWriterProperties()
76 }
77
78 fileSchema := schema.NewSchema(sc)
79 fw := &Writer{
80 props: config.props,
81 sink: &utils.TellWrapper{Writer: w},
82 open: true,
83 Schema: fileSchema,
84 }
85
86 fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, config.keyValueMetadata)
87 fw.startFile()
88 return fw
89 }
90
91
92 func (fw *Writer) NumColumns() int { return fw.Schema.NumColumns() }
93
94
95 func (fw *Writer) NumRowGroups() int { return fw.rowGroups }
96
97
98 func (fw *Writer) NumRows() int { return fw.nrows }
99
100
101 func (fw *Writer) Properties() *parquet.WriterProperties { return fw.props }
102
103
104
105
106
107
108
109 func (fw *Writer) AppendBufferedRowGroup() BufferedRowGroupWriter {
110 return fw.appendRowGroup(true)
111 }
112
113
114
115
116
117
118 func (fw *Writer) AppendRowGroup() SerialRowGroupWriter {
119 return fw.appendRowGroup(false)
120 }
121
122 func (fw *Writer) appendRowGroup(buffered bool) *rowGroupWriter {
123 if fw.rowGroupWriter != nil {
124 fw.nrows += fw.rowGroupWriter.nrows
125 fw.rowGroupWriter.Close()
126 }
127 fw.rowGroups++
128 rgMeta := fw.metadata.AppendRowGroup()
129 fw.rowGroupWriter = newRowGroupWriter(fw.sink, rgMeta, int16(fw.rowGroups)-1, fw.props, buffered, fw.fileEncryptor)
130 return fw.rowGroupWriter
131 }
132
133 func (fw *Writer) startFile() {
134 encryptionProps := fw.props.FileEncryptionProperties()
135 magic := magicBytes
136 if encryptionProps != nil {
137
138 encryptedCols := encryptionProps.EncryptedColumns()
139
140 if len(encryptedCols) != 0 {
141 colPaths := make(map[string]bool)
142 for i := 0; i < fw.Schema.NumColumns(); i++ {
143 colPaths[fw.Schema.Column(i).Path()] = true
144 }
145 for k := range encryptedCols {
146 if _, ok := colPaths[k]; !ok {
147 panic("encrypted column " + k + " not found in file schema")
148 }
149 }
150 }
151
152 fw.fileEncryptor = encryption.NewFileEncryptor(encryptionProps, fw.props.Allocator())
153 if encryptionProps.EncryptedFooter() {
154 magic = magicEBytes
155 }
156 }
157 n, err := fw.sink.Write(magic)
158 if n != 4 || err != nil {
159 panic("failed to write magic number")
160 }
161 }
162
163
164 func (fw *Writer) AppendKeyValueMetadata(key string, value string) error {
165 return fw.metadata.AppendKeyValueMetadata(key, value)
166 }
167
168
169
170 func (fw *Writer) Close() (err error) {
171 if fw.open {
172
173
174 fw.open = false
175 if fw.rowGroupWriter != nil {
176 fw.nrows += fw.rowGroupWriter.nrows
177 fw.rowGroupWriter.Close()
178 }
179 fw.rowGroupWriter = nil
180 defer func() {
181 ierr := fw.sink.Close()
182 if err != nil {
183 if ierr != nil {
184 err = fmt.Errorf("error on close:%w, %s", err, ierr)
185 }
186 return
187 }
188
189 err = ierr
190 }()
191
192 fileEncryptProps := fw.props.FileEncryptionProperties()
193 if fileEncryptProps == nil {
194 fileMetadata, err := fw.metadata.Finish()
195 if err != nil {
196 return err
197 }
198
199 _, err = writeFileMetadata(fileMetadata, fw.sink)
200 return err
201 }
202
203 return fw.closeEncryptedFile(fileEncryptProps)
204 }
205 return nil
206 }
207
208 func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) error {
209
210 if props.EncryptedFooter() {
211 fileMetadata, err := fw.metadata.Finish()
212 if err != nil {
213 return err
214 }
215
216 footerLen := int64(0)
217
218 cryptoMetadata := fw.metadata.GetFileCryptoMetaData()
219 n, err := writeFileCryptoMetadata(cryptoMetadata, fw.sink)
220 if err != nil {
221 return err
222 }
223
224 footerLen += n
225 footerEncryptor := fw.fileEncryptor.GetFooterEncryptor()
226 n, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerEncryptor, true)
227 if err != nil {
228 return err
229 }
230 footerLen += n
231
232 if err = binary.Write(fw.sink, binary.LittleEndian, uint32(footerLen)); err != nil {
233 return err
234 }
235 if _, err = fw.sink.Write(magicEBytes); err != nil {
236 return err
237 }
238 } else {
239 fileMetadata, err := fw.metadata.Finish()
240 if err != nil {
241 return err
242 }
243 footerSigningEncryptor := fw.fileEncryptor.GetFooterSigningEncryptor()
244 if _, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerSigningEncryptor, false); err != nil {
245 return err
246 }
247 }
248 if fw.fileEncryptor != nil {
249 fw.fileEncryptor.WipeOutEncryptionKeys()
250 }
251 return nil
252 }
253
254 func writeFileMetadata(fileMetadata *metadata.FileMetaData, w io.Writer) (n int64, err error) {
255 n, err = fileMetadata.WriteTo(w, nil)
256 if err != nil {
257 return
258 }
259
260 if err = binary.Write(w, binary.LittleEndian, uint32(n)); err != nil {
261 return
262 }
263 if _, err = w.Write(magicBytes); err != nil {
264 return
265 }
266 return n + int64(4+len(magicBytes)), nil
267 }
268
269 func writeEncryptedFileMetadata(fileMetadata *metadata.FileMetaData, w io.Writer, encryptor encryption.Encryptor, encryptFooter bool) (n int64, err error) {
270 n, err = fileMetadata.WriteTo(w, encryptor)
271 if encryptFooter {
272 return
273 }
274 if err != nil {
275 return
276 }
277 if err = binary.Write(w, binary.LittleEndian, uint32(n)); err != nil {
278 return
279 }
280 if _, err = w.Write(magicBytes); err != nil {
281 return
282 }
283 return n + int64(4+len(magicBytes)), nil
284 }
285
286 func writeFileCryptoMetadata(crypto *metadata.FileCryptoMetadata, w io.Writer) (int64, error) {
287 return crypto.WriteTo(w)
288 }
289
View as plain text