1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package file
18
19 import (
20 "bytes"
21 "encoding/binary"
22 "io"
23
24 "github.com/apache/arrow/go/v15/arrow"
25 "github.com/apache/arrow/go/v15/arrow/array"
26 "github.com/apache/arrow/go/v15/arrow/bitutil"
27 "github.com/apache/arrow/go/v15/arrow/memory"
28 "github.com/apache/arrow/go/v15/parquet"
29 "github.com/apache/arrow/go/v15/parquet/internal/encoding"
30 "github.com/apache/arrow/go/v15/parquet/metadata"
31 "github.com/apache/arrow/go/v15/parquet/schema"
32 )
33
34
35
36
37
38
39 type ColumnChunkWriter interface {
40
41 Close() error
42
43 Type() parquet.Type
44
45 Descr() *schema.Column
46
47 RowsWritten() int
48
49 TotalCompressedBytes() int64
50
51
52 TotalBytesWritten() int64
53
54 Properties() *parquet.WriterProperties
55
56
57 CurrentEncoder() encoding.TypedEncoder
58
59
60
61
62
63
64
65
66
67 FallbackToPlain()
68
69
70 PageStatistics() metadata.TypedStatistics
71
72
73
74 WriteDictIndices(arrow.Array, []int16, []int16) error
75
76 LevelInfo() LevelInfo
77 SetBitsBuffer(*memory.Buffer)
78 HasBitsBuffer() bool
79 }
80
81 func computeLevelInfo(descr *schema.Column) (info LevelInfo) {
82 info.DefLevel = descr.MaxDefinitionLevel()
83 info.RepLevel = descr.MaxRepetitionLevel()
84
85 minSpacedDefLevel := descr.MaxDefinitionLevel()
86 n := descr.SchemaNode()
87 for n != nil && n.RepetitionType() != parquet.Repetitions.Repeated {
88 if n.RepetitionType() == parquet.Repetitions.Optional {
89 minSpacedDefLevel--
90 }
91 n = n.Parent()
92 }
93 info.RepeatedAncestorDefLevel = minSpacedDefLevel
94 return
95 }
96
97 type columnWriter struct {
98 metaData *metadata.ColumnChunkMetaDataBuilder
99 descr *schema.Column
100
101
102 bitsBuffer *memory.Buffer
103 levelInfo LevelInfo
104 pager PageWriter
105 hasDict bool
106 encoding parquet.Encoding
107 props *parquet.WriterProperties
108 defEncoder encoding.LevelEncoder
109 repEncoder encoding.LevelEncoder
110 mem memory.Allocator
111
112 pageStatistics metadata.TypedStatistics
113 chunkStatistics metadata.TypedStatistics
114
115
116
117
118
119
120 numBufferedValues int64
121
122
123
124
125 numBufferedRows int
126
127
128
129 numDataValues int64
130
131 rowsWritten int
132 totalBytesWritten int64
133
134 totalCompressedBytes int64
135 closed bool
136 fallbackToNonDict bool
137
138 pages []DataPage
139
140 defLevelSink *encoding.PooledBufferWriter
141 repLevelSink *encoding.PooledBufferWriter
142
143 uncompressedData bytes.Buffer
144 compressedTemp *bytes.Buffer
145
146 currentEncoder encoding.TypedEncoder
147 }
148
149 func newColumnWriterBase(metaData *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) columnWriter {
150 ret := columnWriter{
151 metaData: metaData,
152 descr: metaData.Descr(),
153 levelInfo: computeLevelInfo(metaData.Descr()),
154 pager: pager,
155 hasDict: useDict,
156 encoding: enc,
157 props: props,
158 mem: props.Allocator(),
159 defLevelSink: encoding.NewPooledBufferWriter(0),
160 repLevelSink: encoding.NewPooledBufferWriter(0),
161 }
162 if pager.HasCompressor() {
163 ret.compressedTemp = new(bytes.Buffer)
164 }
165 if props.StatisticsEnabledFor(ret.descr.Path()) && ret.descr.SortOrder() != schema.SortUNKNOWN {
166 ret.pageStatistics = metadata.NewStatistics(ret.descr, props.Allocator())
167 ret.chunkStatistics = metadata.NewStatistics(ret.descr, props.Allocator())
168 }
169
170 ret.defEncoder.Init(parquet.Encodings.RLE, ret.descr.MaxDefinitionLevel(), ret.defLevelSink)
171 ret.repEncoder.Init(parquet.Encodings.RLE, ret.descr.MaxRepetitionLevel(), ret.repLevelSink)
172
173 ret.reset()
174
175 return ret
176 }
177
178 func (w *columnWriter) CurrentEncoder() encoding.TypedEncoder { return w.currentEncoder }
179 func (w *columnWriter) HasBitsBuffer() bool { return w.bitsBuffer != nil }
180 func (w *columnWriter) SetBitsBuffer(buf *memory.Buffer) { w.bitsBuffer = buf }
181 func (w *columnWriter) PageStatistics() metadata.TypedStatistics { return w.pageStatistics }
182 func (w *columnWriter) LevelInfo() LevelInfo { return w.levelInfo }
183
184 func (w *columnWriter) Type() parquet.Type {
185 return w.descr.PhysicalType()
186 }
187
188 func (w *columnWriter) Descr() *schema.Column {
189 return w.descr
190 }
191
192 func (w *columnWriter) Properties() *parquet.WriterProperties {
193 return w.props
194 }
195
196 func (w *columnWriter) TotalCompressedBytes() int64 {
197 return w.totalCompressedBytes
198 }
199
200 func (w *columnWriter) TotalBytesWritten() int64 {
201 return w.totalBytesWritten
202 }
203
204 func (w *columnWriter) RowsWritten() int {
205 return w.rowsWritten + w.numBufferedRows
206 }
207
208 func (w *columnWriter) WriteDataPage(page DataPage) error {
209 written, err := w.pager.WriteDataPage(page)
210 w.totalBytesWritten += written
211 return err
212 }
213
214 func (w *columnWriter) WriteDefinitionLevels(levels []int16) {
215 w.defEncoder.EncodeNoFlush(levels)
216 }
217
218 func (w *columnWriter) WriteRepetitionLevels(levels []int16) {
219 w.repEncoder.EncodeNoFlush(levels)
220 }
221
222 func (w *columnWriter) reset() {
223 w.defLevelSink.Reset(0)
224 w.repLevelSink.Reset(0)
225
226 if w.props.DataPageVersion() == parquet.DataPageV1 {
227
228
229 if w.descr.MaxDefinitionLevel() > 0 {
230 w.defLevelSink.SetOffset(arrow.Uint32SizeBytes)
231 }
232 if w.descr.MaxRepetitionLevel() > 0 {
233 w.repLevelSink.SetOffset(arrow.Uint32SizeBytes)
234 }
235 }
236
237 w.defEncoder.Reset(w.descr.MaxDefinitionLevel())
238 w.repEncoder.Reset(w.descr.MaxRepetitionLevel())
239 }
240
241 func (w *columnWriter) concatBuffers(defLevelsSize, repLevelsSize int32, values []byte, wr io.Writer) {
242 wr.Write(w.repLevelSink.Bytes()[:repLevelsSize])
243 wr.Write(w.defLevelSink.Bytes()[:defLevelsSize])
244 wr.Write(values)
245 }
246
247 func (w *columnWriter) EstimatedBufferedValueBytes() int64 {
248 return w.currentEncoder.EstimatedDataEncodedSize()
249 }
250
251 func (w *columnWriter) commitWriteAndCheckPageLimit(numLevels, numValues int64) error {
252 w.numBufferedValues += numLevels
253 w.numDataValues += numValues
254
255 enc := w.currentEncoder.EstimatedDataEncodedSize()
256 if enc >= w.props.DataPageSize() {
257 return w.FlushCurrentPage()
258 }
259 return nil
260 }
261
262 func (w *columnWriter) FlushCurrentPage() error {
263 var (
264 defLevelsRLESize int32 = 0
265 repLevelsRLESize int32 = 0
266 )
267
268 values, err := w.currentEncoder.FlushValues()
269 if err != nil {
270 return err
271 }
272 defer values.Release()
273
274 isV1DataPage := w.props.DataPageVersion() == parquet.DataPageV1
275 if w.descr.MaxDefinitionLevel() > 0 {
276 w.defEncoder.Flush()
277 w.defLevelSink.SetOffset(0)
278 sz := w.defEncoder.Len()
279 if isV1DataPage {
280 sz += arrow.Uint32SizeBytes
281 binary.LittleEndian.PutUint32(w.defLevelSink.Bytes(), uint32(w.defEncoder.Len()))
282 }
283 defLevelsRLESize = int32(sz)
284 }
285
286 if w.descr.MaxRepetitionLevel() > 0 {
287 w.repEncoder.Flush()
288 w.repLevelSink.SetOffset(0)
289 if isV1DataPage {
290 binary.LittleEndian.PutUint32(w.repLevelSink.Bytes(), uint32(w.repEncoder.Len()))
291 }
292 repLevelsRLESize = int32(w.repLevelSink.Len())
293 }
294
295 uncompressed := defLevelsRLESize + repLevelsRLESize + int32(values.Len())
296 if isV1DataPage {
297 err = w.buildDataPageV1(defLevelsRLESize, repLevelsRLESize, uncompressed, values.Bytes())
298 } else {
299 err = w.buildDataPageV2(defLevelsRLESize, repLevelsRLESize, uncompressed, values.Bytes())
300 }
301
302 w.reset()
303 w.rowsWritten += w.numBufferedRows
304 w.numBufferedValues, w.numDataValues, w.numBufferedRows = 0, 0, 0
305 return err
306 }
307
308 func (w *columnWriter) buildDataPageV1(defLevelsRLESize, repLevelsRLESize, uncompressed int32, values []byte) error {
309 w.uncompressedData.Reset()
310 w.uncompressedData.Grow(int(uncompressed))
311 w.concatBuffers(defLevelsRLESize, repLevelsRLESize, values, &w.uncompressedData)
312
313 pageStats, err := w.getPageStatistics()
314 if err != nil {
315 return err
316 }
317 pageStats.ApplyStatSizeLimits(int(w.props.MaxStatsSizeFor(w.descr.Path())))
318 pageStats.Signed = schema.SortSIGNED == w.descr.SortOrder()
319 w.resetPageStatistics()
320
321 var data []byte
322 if w.pager.HasCompressor() {
323 w.compressedTemp.Reset()
324 data = w.pager.Compress(w.compressedTemp, w.uncompressedData.Bytes())
325 } else {
326 data = w.uncompressedData.Bytes()
327 }
328
329
330 if w.hasDict && !w.fallbackToNonDict {
331 pageSlice := make([]byte, len(data))
332 copy(pageSlice, data)
333 page := NewDataPageV1WithStats(memory.NewBufferBytes(pageSlice), int32(w.numBufferedValues), w.encoding, parquet.Encodings.RLE, parquet.Encodings.RLE, uncompressed, pageStats)
334 w.totalCompressedBytes += int64(page.buf.Len())
335 w.pages = append(w.pages, page)
336 } else {
337 w.totalCompressedBytes += int64(len(data))
338 dp := NewDataPageV1WithStats(memory.NewBufferBytes(data), int32(w.numBufferedValues), w.encoding, parquet.Encodings.RLE, parquet.Encodings.RLE, uncompressed, pageStats)
339 defer dp.Release()
340 return w.WriteDataPage(dp)
341 }
342 return nil
343 }
344
345 func (w *columnWriter) buildDataPageV2(defLevelsRLESize, repLevelsRLESize, uncompressed int32, values []byte) error {
346 var data []byte
347 if w.pager.HasCompressor() {
348 w.compressedTemp.Reset()
349 data = w.pager.Compress(w.compressedTemp, values)
350 } else {
351 data = values
352 }
353
354
355 var combined bytes.Buffer
356 combined.Grow(int(defLevelsRLESize + repLevelsRLESize + int32(len(data))))
357 w.concatBuffers(defLevelsRLESize, repLevelsRLESize, data, &combined)
358
359 pageStats, err := w.getPageStatistics()
360 if err != nil {
361 return err
362 }
363 pageStats.ApplyStatSizeLimits(int(w.props.MaxStatsSizeFor(w.descr.Path())))
364 pageStats.Signed = schema.SortSIGNED == w.descr.SortOrder()
365 w.resetPageStatistics()
366
367 numValues := int32(w.numBufferedValues)
368 numRows := int32(w.numBufferedRows)
369 nullCount := int32(pageStats.NullCount)
370 defLevelsByteLen := int32(defLevelsRLESize)
371 repLevelsByteLen := int32(repLevelsRLESize)
372
373 page := NewDataPageV2WithStats(memory.NewBufferBytes(combined.Bytes()), numValues, nullCount, numRows, w.encoding,
374 defLevelsByteLen, repLevelsByteLen, uncompressed, w.pager.HasCompressor(), pageStats)
375 if w.hasDict && !w.fallbackToNonDict {
376 w.totalCompressedBytes += int64(page.buf.Len())
377 w.pages = append(w.pages, page)
378 } else {
379 w.totalCompressedBytes += int64(combined.Len())
380 defer page.Release()
381 return w.WriteDataPage(page)
382 }
383 return nil
384 }
385
386 func (w *columnWriter) FlushBufferedDataPages() (err error) {
387 if w.numBufferedValues > 0 {
388 if err = w.FlushCurrentPage(); err != nil {
389 return err
390 }
391 }
392
393 for _, p := range w.pages {
394 defer p.Release()
395 if err = w.WriteDataPage(p); err != nil {
396 return err
397 }
398 }
399 w.pages = w.pages[:0]
400 w.totalCompressedBytes = 0
401 return
402 }
403
404 func (w *columnWriter) writeLevels(numValues int64, defLevels, repLevels []int16) int64 {
405 toWrite := int64(0)
406
407 if defLevels != nil && w.descr.MaxDefinitionLevel() > 0 {
408 for _, v := range defLevels[:numValues] {
409 if v == w.descr.MaxDefinitionLevel() {
410 toWrite++
411 }
412 }
413 w.WriteDefinitionLevels(defLevels[:numValues])
414 } else {
415 toWrite = numValues
416 }
417
418 if repLevels != nil && w.descr.MaxRepetitionLevel() > 0 {
419
420
421 for _, v := range repLevels[:numValues] {
422 if v == 0 {
423 w.numBufferedRows++
424 }
425 }
426
427 w.WriteRepetitionLevels(repLevels[:numValues])
428 } else {
429
430 w.numBufferedRows += int(numValues)
431 }
432 return toWrite
433 }
434
435 func (w *columnWriter) writeLevelsSpaced(numLevels int64, defLevels, repLevels []int16) {
436 if w.descr.MaxDefinitionLevel() > 0 {
437 w.WriteDefinitionLevels(defLevels[:numLevels])
438 }
439
440 if w.descr.MaxRepetitionLevel() > 0 {
441 for _, v := range repLevels {
442 if v == 0 {
443 w.numBufferedRows++
444 }
445 }
446 w.WriteRepetitionLevels(repLevels[:numLevels])
447 } else {
448 w.numBufferedRows += int(numLevels)
449 }
450 }
451
452 func (w *columnWriter) WriteDictionaryPage() error {
453 dictEncoder := w.currentEncoder.(encoding.DictEncoder)
454 buffer := memory.NewResizableBuffer(w.mem)
455 buffer.Resize(dictEncoder.DictEncodedSize())
456 dictEncoder.WriteDict(buffer.Bytes())
457 defer buffer.Release()
458
459 page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()), w.props.DictionaryPageEncoding())
460 written, err := w.pager.WriteDictionaryPage(page)
461 w.totalBytesWritten += written
462 return err
463 }
464
465 type batchWriteInfo struct {
466 batchNum int64
467 nullCount int64
468 }
469
470 func (b batchWriteInfo) numSpaced() int64 { return b.batchNum + b.nullCount }
471
472
473
474
475
476 func (w *columnWriter) maybeCalculateValidityBits(defLevels []int16, batchSize int64) (out batchWriteInfo) {
477 if w.bitsBuffer == nil {
478 if w.levelInfo.DefLevel == 0 {
479
480
481
482
483 out.batchNum = batchSize
484 out.nullCount = 0
485 } else {
486 var (
487 toWrite int64
488 spacedToWrite int64
489 )
490 for i := int64(0); i < batchSize; i++ {
491 if defLevels[i] == w.levelInfo.DefLevel {
492 toWrite++
493 }
494 if defLevels[i] >= w.levelInfo.RepeatedAncestorDefLevel {
495 spacedToWrite++
496 }
497 }
498 out.batchNum += toWrite
499 out.nullCount = spacedToWrite - toWrite
500 }
501 return
502 }
503
504
505 newBitmapSize := bitutil.BytesForBits(batchSize)
506 if newBitmapSize != int64(w.bitsBuffer.Len()) {
507 w.bitsBuffer.ResizeNoShrink(int(newBitmapSize))
508 }
509
510 io := ValidityBitmapInputOutput{
511 ValidBits: w.bitsBuffer.Bytes(),
512 ReadUpperBound: batchSize,
513 }
514 DefLevelsToBitmap(defLevels[:batchSize], w.levelInfo, &io)
515 out.batchNum = io.Read - io.NullCount
516 out.nullCount = io.NullCount
517 return
518 }
519
520 func (w *columnWriter) getPageStatistics() (enc metadata.EncodedStatistics, err error) {
521 if w.pageStatistics != nil {
522 enc, err = w.pageStatistics.Encode()
523 }
524 return
525 }
526
527 func (w *columnWriter) getChunkStatistics() (enc metadata.EncodedStatistics, err error) {
528 if w.chunkStatistics != nil {
529 enc, err = w.chunkStatistics.Encode()
530 }
531 return
532 }
533
534 func (w *columnWriter) resetPageStatistics() {
535 if w.chunkStatistics != nil {
536 w.chunkStatistics.Merge(w.pageStatistics)
537 w.pageStatistics.Reset()
538 }
539 }
540
541 func (w *columnWriter) Close() (err error) {
542 if !w.closed {
543 w.closed = true
544 if w.hasDict && !w.fallbackToNonDict {
545 w.WriteDictionaryPage()
546 }
547
548 if err = w.FlushBufferedDataPages(); err != nil {
549 return err
550 }
551
552
553
554 defer func() {
555 w.defLevelSink.Reset(0)
556 w.repLevelSink.Reset(0)
557 if w.bitsBuffer != nil {
558 w.bitsBuffer.Release()
559 w.bitsBuffer = nil
560 }
561
562 w.currentEncoder.Release()
563 w.currentEncoder = nil
564 }()
565
566 var chunkStats metadata.EncodedStatistics
567 chunkStats, err = w.getChunkStatistics()
568 if err != nil {
569 return err
570 }
571
572 chunkStats.ApplyStatSizeLimits(int(w.props.MaxStatsSizeFor(w.descr.Path())))
573 chunkStats.Signed = schema.SortSIGNED == w.descr.SortOrder()
574
575 if w.rowsWritten > 0 && chunkStats.IsSet() {
576 w.metaData.SetStats(chunkStats)
577 }
578 err = w.pager.Close(w.hasDict, w.fallbackToNonDict)
579 }
580 return err
581 }
582
583 func (w *columnWriter) doBatches(total int64, repLevels []int16, action func(offset, batch int64)) {
584 batchSize := w.props.WriteBatchSize()
585
586
587 if w.props.DataPageVersion() == parquet.DataPageV1 || repLevels == nil || w.descr.MaxRepetitionLevel() == 0 {
588 doBatches(total, batchSize, action)
589 return
590 }
591
592
593
594
595
596
597
598 if int64(len(repLevels)) < total {
599
600
601 panic("columnwriter: not enough repetition levels for batch to write")
602 }
603
604 if repLevels[0] != 0 {
605 panic("columnwriter: batch writing for V2 data pages must start at a row boundary")
606 }
607
608
609 var (
610 batchStart, batch int64
611 )
612 for batchStart = 0; batchStart+batchSize < int64(len(repLevels)); batchStart += batch {
613
614
615
616 batch = batchSize
617 for ; repLevels[batchStart+batch] != 0; batch-- {
618 }
619
620 action(batchStart, batch)
621 }
622 action(batchStart, int64(len(repLevels))-batchStart)
623 }
624
625 func doBatches(total, batchSize int64, action func(offset, batch int64)) {
626 numBatches := total / batchSize
627 for i := int64(0); i < numBatches; i++ {
628 action(i*batchSize, batchSize)
629 }
630 if total%batchSize > 0 {
631 action(numBatches*batchSize, total%batchSize)
632 }
633 }
634
635 func levelSliceOrNil(rep []int16, offset, batch int64) []int16 {
636 if rep == nil {
637 return nil
638 }
639 return rep[offset : batch+offset]
640 }
641
642
643 func (w *columnWriter) maybeReplaceValidity(values arrow.Array, newNullCount int64) arrow.Array {
644 if w.bitsBuffer == nil {
645 values.Retain()
646 return values
647 }
648
649 if len(values.Data().Buffers()) == 0 {
650 values.Retain()
651 return values
652 }
653
654 buffers := make([]*memory.Buffer, len(values.Data().Buffers()))
655 copy(buffers, values.Data().Buffers())
656
657
658 buffers[0] = w.bitsBuffer
659
660 if values.Data().Offset() > 0 {
661 data := values.Data()
662 buffers[1] = memory.NewBufferBytes(data.Buffers()[1].Bytes()[data.Offset()*arrow.Int32SizeBytes : data.Len()*arrow.Int32SizeBytes])
663 }
664
665 data := array.NewData(values.DataType(), values.Len(), buffers, nil, int(newNullCount), 0)
666 defer data.Release()
667 return array.MakeFromData(data)
668 }
669
View as plain text