1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "bytes"
21 "sync"
22
23 "github.com/apache/arrow/go/v15/arrow/memory"
24 "github.com/apache/arrow/go/v15/parquet"
25 "github.com/apache/arrow/go/v15/parquet/compress"
26 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
27 "github.com/apache/arrow/go/v15/parquet/internal/encryption"
28 format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
29 "github.com/apache/arrow/go/v15/parquet/internal/thrift"
30 "github.com/apache/arrow/go/v15/parquet/internal/utils"
31 "github.com/apache/arrow/go/v15/parquet/metadata"
32 libthrift "github.com/apache/thrift/lib/go/thrift"
33 "golang.org/x/xerrors"
34 )
35
36
37 type PageWriter interface {
38
39
40 Close(hasDict, fallback bool) error
41
42 WriteDataPage(page DataPage) (int64, error)
43
44 WriteDictionaryPage(page *DictionaryPage) (int64, error)
45
46 HasCompressor() bool
47
48
49
50 Compress(buf *bytes.Buffer, src []byte) []byte
51
52
53 Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, columnOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error
54 }
55
56 type serializedPageWriter struct {
57 mem memory.Allocator
58 metaData *metadata.ColumnChunkMetaDataBuilder
59 sink utils.WriterTell
60
61 nvalues int64
62 dictPageOffset int64
63 dataPageOffset int64
64 totalUncompressed int64
65 totalCompressed int64
66 pageOrdinal int16
67 rgOrdinal int16
68 columnOrdinal int16
69
70 compressLevel int
71 compressor compress.Codec
72 metaEncryptor encryption.Encryptor
73 dataEncryptor encryption.Encryptor
74 encryptionBuf bytes.Buffer
75
76 dataPageAAD []byte
77 dataPageHeaderAAD []byte
78
79 dictEncodingStats map[parquet.Encoding]int32
80 dataEncodingStats map[parquet.Encoding]int32
81
82 thriftSerializer *thrift.Serializer
83 }
84
85 func createSerializedPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, mem memory.Allocator, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) {
86 var (
87 compressor compress.Codec
88 err error
89 )
90 if codec != compress.Codecs.Uncompressed {
91 compressor, err = compress.GetCodec(codec)
92 if err != nil {
93 return nil, err
94 }
95 }
96
97 pgwriter := &serializedPageWriter{
98 sink: sink,
99 compressor: compressor,
100 compressLevel: compressionLevel,
101 metaData: metadata,
102 rgOrdinal: rowGroupOrdinal,
103 columnOrdinal: columnChunkOrdinal,
104 mem: mem,
105 metaEncryptor: metaEncryptor,
106 dataEncryptor: dataEncryptor,
107 dictEncodingStats: make(map[parquet.Encoding]int32),
108 dataEncodingStats: make(map[parquet.Encoding]int32),
109 thriftSerializer: thrift.NewThriftSerializer(),
110 }
111 if metaEncryptor != nil || dataEncryptor != nil {
112 pgwriter.initEncryption()
113 }
114 return pgwriter, nil
115 }
116
117
118 func NewPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, mem memory.Allocator, buffered bool, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) {
119 if buffered {
120 return newBufferedPageWriter(sink, codec, compressionLevel, metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, dataEncryptor)
121 }
122 return createSerializedPageWriter(sink, codec, compressionLevel, metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, dataEncryptor)
123 }
124
125
126 func (pw *serializedPageWriter) Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error {
127 var (
128 compressor compress.Codec
129 err error
130 )
131 if codec != compress.Codecs.Uncompressed {
132 compressor, err = compress.GetCodec(codec)
133 if err != nil {
134 return err
135 }
136 }
137
138 pw.sink = sink
139 pw.compressor = compressor
140 pw.compressLevel = compressionLevel
141 pw.metaData = metadata
142 pw.rgOrdinal = rowGroupOrdinal
143 pw.columnOrdinal = columnChunkOrdinal
144 pw.metaEncryptor = metaEncryptor
145 pw.dataEncryptor = dataEncryptor
146 pw.dictEncodingStats = make(map[parquet.Encoding]int32)
147 pw.dataEncodingStats = make(map[parquet.Encoding]int32)
148
149 pw.nvalues = 0
150 pw.dictPageOffset = 0
151 pw.dataPageOffset = 0
152 pw.totalUncompressed = 0
153 pw.totalCompressed = 0
154 pw.pageOrdinal = 0
155
156 if metaEncryptor != nil || dataEncryptor != nil {
157 pw.initEncryption()
158 }
159 return nil
160 }
161
162 func (pw *serializedPageWriter) initEncryption() {
163 if pw.dataEncryptor != nil {
164 pw.dataPageAAD = []byte(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(), encryption.DataPageModule, pw.rgOrdinal, pw.columnOrdinal, -1))
165 }
166 if pw.metaEncryptor != nil {
167 pw.dataPageHeaderAAD = []byte(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), encryption.DataPageHeaderModule, pw.rgOrdinal, pw.columnOrdinal, -1))
168 }
169 }
170
171 func (pw *serializedPageWriter) updateEncryption(moduleType int8) error {
172 switch moduleType {
173 case encryption.ColumnMetaModule:
174 pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1))
175 case encryption.DataPageModule:
176 encryption.QuickUpdatePageAad(pw.dataPageAAD, pw.pageOrdinal)
177 pw.dataEncryptor.UpdateAad(string(pw.dataPageAAD))
178 case encryption.DataPageHeaderModule:
179 encryption.QuickUpdatePageAad(pw.dataPageHeaderAAD, pw.pageOrdinal)
180 pw.metaEncryptor.UpdateAad(string(pw.dataPageHeaderAAD))
181 case encryption.DictPageHeaderModule:
182 pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1))
183 case encryption.DictPageModule:
184 pw.dataEncryptor.UpdateAad(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1))
185 default:
186 return xerrors.New("unknown module type in updateencryption")
187 }
188 return nil
189 }
190
191 func (pw *serializedPageWriter) Close(hasDict, fallback bool) error {
192 if pw.metaEncryptor != nil {
193 pw.updateEncryption(encryption.ColumnMetaModule)
194 }
195
196 chunkInfo := metadata.ChunkMetaInfo{
197 NumValues: pw.nvalues,
198 DictPageOffset: pw.dictPageOffset,
199 IndexPageOffset: -1,
200 DataPageOffset: pw.dataPageOffset,
201 CompressedSize: pw.totalCompressed,
202 UncompressedSize: pw.totalUncompressed,
203 }
204 encodingStats := metadata.EncodingStats{
205 DictEncodingStats: pw.dictEncodingStats,
206 DataEncodingStats: pw.dataEncodingStats,
207 }
208 pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats, pw.metaEncryptor)
209 _, err := pw.metaData.WriteTo(pw.sink)
210 return err
211 }
212
213 func (pw *serializedPageWriter) Compress(buf *bytes.Buffer, src []byte) []byte {
214 maxCompressed := pw.compressor.CompressBound(int64(len(src)))
215 buf.Grow(int(maxCompressed))
216 return pw.compressor.EncodeLevel(buf.Bytes(), src, pw.compressLevel)
217 }
218
219 var dataPageV1HeaderPool = sync.Pool{
220 New: func() interface{} { return format.NewDataPageHeader() },
221 }
222
223 func (pw *serializedPageWriter) setDataPageHeader(pageHdr *format.PageHeader, page *DataPageV1) {
224 pageHdr.Type = format.PageType_DATA_PAGE
225 hdr := dataPageV1HeaderPool.Get().(*format.DataPageHeader)
226 hdr.NumValues = page.nvals
227 hdr.Encoding = page.encoding
228 hdr.DefinitionLevelEncoding = page.defLvlEncoding
229 hdr.RepetitionLevelEncoding = page.repLvlEncoding
230 hdr.Statistics = page.statistics.ToThrift()
231 pageHdr.DataPageHeader = hdr
232 pageHdr.DataPageHeaderV2 = nil
233 pageHdr.DictionaryPageHeader = nil
234 }
235
236 var dataPageV2HeaderPool = sync.Pool{
237 New: func() interface{} { return format.NewDataPageHeaderV2() },
238 }
239
240 func (pw *serializedPageWriter) setDataPageV2Header(pageHdr *format.PageHeader, page *DataPageV2) {
241 pageHdr.Type = format.PageType_DATA_PAGE_V2
242 hdr := dataPageV2HeaderPool.Get().(*format.DataPageHeaderV2)
243 hdr.NumValues = page.nvals
244 hdr.NumNulls = page.nulls
245 hdr.NumRows = page.nrows
246 hdr.Encoding = page.encoding
247 hdr.DefinitionLevelsByteLength = page.defLvlByteLen
248 hdr.RepetitionLevelsByteLength = page.repLvlByteLen
249 hdr.IsCompressed = page.compressed
250 hdr.Statistics = page.statistics.ToThrift()
251 pageHdr.DataPageHeaderV2 = hdr
252 pageHdr.DataPageHeader = nil
253 pageHdr.DictionaryPageHeader = nil
254 }
255
256 func (pw *serializedPageWriter) HasCompressor() bool { return pw.compressor != nil }
257 func (pw *serializedPageWriter) NumValues() int64 { return pw.nvalues }
258 func (pw *serializedPageWriter) DictionaryPageOffset() int64 { return pw.dictPageOffset }
259 func (pw *serializedPageWriter) DataPageoffset() int64 { return pw.dataPageOffset }
260 func (pw *serializedPageWriter) TotalCompressedSize() int64 { return pw.totalCompressed }
261 func (pw *serializedPageWriter) TotalUncompressedSize() int64 { return pw.totalUncompressed }
262
263 func (pw *serializedPageWriter) WriteDictionaryPage(page *DictionaryPage) (int64, error) {
264 uncompressed := len(page.Data())
265
266 var data []byte
267 if pw.HasCompressor() {
268 var buffer bytes.Buffer
269 data = pw.Compress(&buffer, page.Data())
270
271 } else {
272 data = page.Data()
273 }
274
275 dictPageHeader := &format.DictionaryPageHeader{
276 NumValues: page.NumValues(),
277 Encoding: page.Encoding(),
278 IsSorted: libthrift.BoolPtr(page.IsSorted()),
279 }
280
281 if pw.dataEncryptor != nil {
282 pw.updateEncryption(encryption.DictPageModule)
283 pw.encryptionBuf.Reset()
284 pw.encryptionBuf.Grow(pw.dataEncryptor.CiphertextSizeDelta() + len(data))
285 pw.dataEncryptor.Encrypt(&pw.encryptionBuf, data)
286 data = pw.encryptionBuf.Bytes()
287 }
288
289 pageHdr := pageHeaderPool.Get().(*format.PageHeader)
290 defer pageHeaderPool.Put(pageHdr)
291 pageHdr.Type = format.PageType_DICTIONARY_PAGE
292 pageHdr.UncompressedPageSize = int32(uncompressed)
293 pageHdr.CompressedPageSize = int32(len(data))
294 pageHdr.DictionaryPageHeader = dictPageHeader
295 pageHdr.DataPageHeader = nil
296 pageHdr.DataPageHeaderV2 = nil
297
298 startPos := pw.sink.Tell()
299 if pw.dictPageOffset == 0 {
300 pw.dictPageOffset = int64(startPos)
301 }
302
303 if pw.metaEncryptor != nil {
304 if err := pw.updateEncryption(encryption.DictPageHeaderModule); err != nil {
305 return 0, err
306 }
307 }
308 headerSize, err := pw.thriftSerializer.Serialize(pageHdr, pw.sink, pw.metaEncryptor)
309 if err != nil {
310 return 0, err
311 }
312 written, err := pw.sink.Write(data)
313 if err != nil {
314 return 0, err
315 }
316
317 written += headerSize
318
319 pw.totalUncompressed += int64(uncompressed + headerSize)
320 pw.totalCompressed = int64(written)
321 pw.dictEncodingStats[parquet.Encoding(page.encoding)]++
322 return int64(written), nil
323 }
324
325 var pageHeaderPool = sync.Pool{
326 New: func() interface{} {
327 return format.NewPageHeader()
328 },
329 }
330
331 func (pw *serializedPageWriter) WriteDataPage(page DataPage) (int64, error) {
332 uncompressed := page.UncompressedSize()
333 data := page.Data()
334
335 if pw.dataEncryptor != nil {
336 if err := pw.updateEncryption(encryption.DataPageModule); err != nil {
337 return 0, err
338 }
339 pw.encryptionBuf.Reset()
340 pw.encryptionBuf.Grow(pw.dataEncryptor.CiphertextSizeDelta() + len(data))
341 pw.dataEncryptor.Encrypt(&pw.encryptionBuf, data)
342 data = pw.encryptionBuf.Bytes()
343 }
344
345 pageHdr := pageHeaderPool.Get().(*format.PageHeader)
346 defer pageHeaderPool.Put(pageHdr)
347 pageHdr.UncompressedPageSize = uncompressed
348 pageHdr.CompressedPageSize = int32(len(data))
349
350 switch dpage := page.(type) {
351 case *DataPageV1:
352 pw.setDataPageHeader(pageHdr, dpage)
353 defer dataPageV1HeaderPool.Put(pageHdr.DataPageHeader)
354 case *DataPageV2:
355 pw.setDataPageV2Header(pageHdr, dpage)
356 defer dataPageV2HeaderPool.Put(pageHdr.DataPageHeaderV2)
357 default:
358 return 0, xerrors.New("parquet: unexpected page type")
359 }
360
361 startPos := pw.sink.Tell()
362 if pw.pageOrdinal == 0 {
363 pw.dataPageOffset = int64(startPos)
364 }
365
366 if pw.metaEncryptor != nil {
367 if err := pw.updateEncryption(encryption.DataPageHeaderModule); err != nil {
368 return 0, err
369 }
370 }
371 headerSize, err := pw.thriftSerializer.Serialize(pageHdr, pw.sink, pw.metaEncryptor)
372 if err != nil {
373 return 0, err
374 }
375 written, err := pw.sink.Write(data)
376 if err != nil {
377 return int64(written), err
378 }
379 written += headerSize
380
381 pw.totalUncompressed += int64(uncompressed) + int64(headerSize)
382 pw.totalCompressed += int64(written)
383 pw.nvalues += int64(page.NumValues())
384 pw.dataEncodingStats[parquet.Encoding(page.Encoding())]++
385 pw.pageOrdinal++
386 return int64(written), nil
387 }
388
389 type bufferedPageWriter struct {
390 finalSink utils.WriterTell
391 inMemSink *encoding.BufferWriter
392 metadata *metadata.ColumnChunkMetaDataBuilder
393 pager *serializedPageWriter
394 hasDictionaryPages bool
395 }
396
397 func newBufferedPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, columnOrdinal int16, mem memory.Allocator, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) {
398 wr := &bufferedPageWriter{
399 finalSink: sink,
400 metadata: metadata,
401 hasDictionaryPages: false,
402 inMemSink: encoding.NewBufferWriter(0, mem),
403 }
404 pager, err := createSerializedPageWriter(wr.inMemSink, codec, compressionLevel, metadata, rgOrdinal, columnOrdinal, mem, metaEncryptor, dataEncryptor)
405 if err != nil {
406 return nil, err
407 }
408 wr.pager = pager.(*serializedPageWriter)
409 return wr, nil
410 }
411
412 func (bw *bufferedPageWriter) Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, columnOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error {
413 bw.finalSink = sink
414 bw.metadata = metadata
415 bw.hasDictionaryPages = false
416 bw.inMemSink.Reset(0)
417
418 return bw.pager.Reset(bw.inMemSink, codec, compressionLevel, metadata, rgOrdinal, columnOrdinal, metaEncryptor, dataEncryptor)
419 }
420
421 func (bw *bufferedPageWriter) WriteDictionaryPage(page *DictionaryPage) (int64, error) {
422 bw.hasDictionaryPages = true
423 return bw.pager.WriteDictionaryPage(page)
424 }
425
426 func (bw *bufferedPageWriter) Close(hasDict, fallback bool) error {
427 if bw.pager.metaEncryptor != nil {
428 bw.pager.updateEncryption(encryption.ColumnMetaModule)
429 }
430
431 position := bw.finalSink.Tell()
432 dictOffset := int64(0)
433 if bw.hasDictionaryPages {
434 dictOffset = bw.pager.DictionaryPageOffset() + position
435 }
436
437 chunkInfo := metadata.ChunkMetaInfo{
438 NumValues: bw.pager.NumValues(),
439 DictPageOffset: dictOffset,
440 IndexPageOffset: -1,
441 DataPageOffset: bw.pager.DataPageoffset() + position,
442 CompressedSize: bw.pager.TotalCompressedSize(),
443 UncompressedSize: bw.pager.TotalUncompressedSize(),
444 }
445 encodingStats := metadata.EncodingStats{
446 DictEncodingStats: bw.pager.dictEncodingStats,
447 DataEncodingStats: bw.pager.dataEncodingStats,
448 }
449 bw.metadata.Finish(chunkInfo, hasDict, fallback, encodingStats, bw.pager.metaEncryptor)
450 bw.metadata.WriteTo(bw.inMemSink)
451
452 buf := bw.inMemSink.Finish()
453 defer buf.Release()
454 _, err := bw.finalSink.Write(buf.Bytes())
455 return err
456 }
457
458 func (bw *bufferedPageWriter) WriteDataPage(page DataPage) (int64, error) {
459 return bw.pager.WriteDataPage(page)
460 }
461
462 func (bw *bufferedPageWriter) HasCompressor() bool {
463 return bw.pager.HasCompressor()
464 }
465
466 func (bw *bufferedPageWriter) Compress(buf *bytes.Buffer, src []byte) []byte {
467 return bw.pager.Compress(buf, src)
468 }
469
View as plain text