// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package file import ( "bytes" "sync" "github.com/apache/arrow/go/v15/arrow/memory" "github.com/apache/arrow/go/v15/parquet" "github.com/apache/arrow/go/v15/parquet/compress" "github.com/apache/arrow/go/v15/parquet/internal/encoding" "github.com/apache/arrow/go/v15/parquet/internal/encryption" format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet" "github.com/apache/arrow/go/v15/parquet/internal/thrift" "github.com/apache/arrow/go/v15/parquet/internal/utils" "github.com/apache/arrow/go/v15/parquet/metadata" libthrift "github.com/apache/thrift/lib/go/thrift" "golang.org/x/xerrors" ) // PageWriter is the interface for both serialized and buffered page writers type PageWriter interface { // Closes the current page, flushing any buffered data pages/dictionary pages // based on the input parameters. Subsequent calls have no effect. Close(hasDict, fallback bool) error // Write the provided datapage out to the underlying writer WriteDataPage(page DataPage) (int64, error) // Write the provided dictionary page out to the underlying writer WriteDictionaryPage(page *DictionaryPage) (int64, error) // returns true if there is a configured compressor for the data HasCompressor() bool // use the configured compressor and writer properties to compress the data in src // using the buffer buf. Returns the slice of the compressed bytes which may be // the bytes in the provided buffer Compress(buf *bytes.Buffer, src []byte) []byte // Allow reuse of the pagewriter object by resetting it using these values instead // of having to create a new object. Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, columnOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error } type serializedPageWriter struct { mem memory.Allocator metaData *metadata.ColumnChunkMetaDataBuilder sink utils.WriterTell nvalues int64 dictPageOffset int64 dataPageOffset int64 totalUncompressed int64 totalCompressed int64 pageOrdinal int16 rgOrdinal int16 columnOrdinal int16 compressLevel int compressor compress.Codec metaEncryptor encryption.Encryptor dataEncryptor encryption.Encryptor encryptionBuf bytes.Buffer dataPageAAD []byte dataPageHeaderAAD []byte dictEncodingStats map[parquet.Encoding]int32 dataEncodingStats map[parquet.Encoding]int32 thriftSerializer *thrift.Serializer } func createSerializedPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, mem memory.Allocator, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) { var ( compressor compress.Codec err error ) if codec != compress.Codecs.Uncompressed { compressor, err = compress.GetCodec(codec) if err != nil { return nil, err } } pgwriter := &serializedPageWriter{ sink: sink, compressor: compressor, compressLevel: compressionLevel, metaData: metadata, rgOrdinal: rowGroupOrdinal, columnOrdinal: columnChunkOrdinal, mem: mem, metaEncryptor: metaEncryptor, dataEncryptor: dataEncryptor, dictEncodingStats: make(map[parquet.Encoding]int32), dataEncodingStats: make(map[parquet.Encoding]int32), thriftSerializer: thrift.NewThriftSerializer(), } if metaEncryptor != nil || dataEncryptor != nil { pgwriter.initEncryption() } return pgwriter, nil } // NewPageWriter returns a page writer using either the buffered or serialized implementations func NewPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, mem memory.Allocator, buffered bool, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) { if buffered { return newBufferedPageWriter(sink, codec, compressionLevel, metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, dataEncryptor) } return createSerializedPageWriter(sink, codec, compressionLevel, metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, dataEncryptor) } // Reset allows reusing the pagewriter object instead of creating a new one. func (pw *serializedPageWriter) Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error { var ( compressor compress.Codec err error ) if codec != compress.Codecs.Uncompressed { compressor, err = compress.GetCodec(codec) if err != nil { return err } } pw.sink = sink pw.compressor = compressor pw.compressLevel = compressionLevel pw.metaData = metadata pw.rgOrdinal = rowGroupOrdinal pw.columnOrdinal = columnChunkOrdinal pw.metaEncryptor = metaEncryptor pw.dataEncryptor = dataEncryptor pw.dictEncodingStats = make(map[parquet.Encoding]int32) pw.dataEncodingStats = make(map[parquet.Encoding]int32) pw.nvalues = 0 pw.dictPageOffset = 0 pw.dataPageOffset = 0 pw.totalUncompressed = 0 pw.totalCompressed = 0 pw.pageOrdinal = 0 if metaEncryptor != nil || dataEncryptor != nil { pw.initEncryption() } return nil } func (pw *serializedPageWriter) initEncryption() { if pw.dataEncryptor != nil { pw.dataPageAAD = []byte(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(), encryption.DataPageModule, pw.rgOrdinal, pw.columnOrdinal, -1)) } if pw.metaEncryptor != nil { pw.dataPageHeaderAAD = []byte(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), encryption.DataPageHeaderModule, pw.rgOrdinal, pw.columnOrdinal, -1)) } } func (pw *serializedPageWriter) updateEncryption(moduleType int8) error { switch moduleType { case encryption.ColumnMetaModule: pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1)) case encryption.DataPageModule: encryption.QuickUpdatePageAad(pw.dataPageAAD, pw.pageOrdinal) pw.dataEncryptor.UpdateAad(string(pw.dataPageAAD)) case encryption.DataPageHeaderModule: encryption.QuickUpdatePageAad(pw.dataPageHeaderAAD, pw.pageOrdinal) pw.metaEncryptor.UpdateAad(string(pw.dataPageHeaderAAD)) case encryption.DictPageHeaderModule: pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1)) case encryption.DictPageModule: pw.dataEncryptor.UpdateAad(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1)) default: return xerrors.New("unknown module type in updateencryption") } return nil } func (pw *serializedPageWriter) Close(hasDict, fallback bool) error { if pw.metaEncryptor != nil { pw.updateEncryption(encryption.ColumnMetaModule) } chunkInfo := metadata.ChunkMetaInfo{ NumValues: pw.nvalues, DictPageOffset: pw.dictPageOffset, IndexPageOffset: -1, DataPageOffset: pw.dataPageOffset, CompressedSize: pw.totalCompressed, UncompressedSize: pw.totalUncompressed, } encodingStats := metadata.EncodingStats{ DictEncodingStats: pw.dictEncodingStats, DataEncodingStats: pw.dataEncodingStats, } pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats, pw.metaEncryptor) _, err := pw.metaData.WriteTo(pw.sink) return err } func (pw *serializedPageWriter) Compress(buf *bytes.Buffer, src []byte) []byte { maxCompressed := pw.compressor.CompressBound(int64(len(src))) buf.Grow(int(maxCompressed)) return pw.compressor.EncodeLevel(buf.Bytes(), src, pw.compressLevel) } var dataPageV1HeaderPool = sync.Pool{ New: func() interface{} { return format.NewDataPageHeader() }, } func (pw *serializedPageWriter) setDataPageHeader(pageHdr *format.PageHeader, page *DataPageV1) { pageHdr.Type = format.PageType_DATA_PAGE hdr := dataPageV1HeaderPool.Get().(*format.DataPageHeader) hdr.NumValues = page.nvals hdr.Encoding = page.encoding hdr.DefinitionLevelEncoding = page.defLvlEncoding hdr.RepetitionLevelEncoding = page.repLvlEncoding hdr.Statistics = page.statistics.ToThrift() pageHdr.DataPageHeader = hdr pageHdr.DataPageHeaderV2 = nil pageHdr.DictionaryPageHeader = nil } var dataPageV2HeaderPool = sync.Pool{ New: func() interface{} { return format.NewDataPageHeaderV2() }, } func (pw *serializedPageWriter) setDataPageV2Header(pageHdr *format.PageHeader, page *DataPageV2) { pageHdr.Type = format.PageType_DATA_PAGE_V2 hdr := dataPageV2HeaderPool.Get().(*format.DataPageHeaderV2) hdr.NumValues = page.nvals hdr.NumNulls = page.nulls hdr.NumRows = page.nrows hdr.Encoding = page.encoding hdr.DefinitionLevelsByteLength = page.defLvlByteLen hdr.RepetitionLevelsByteLength = page.repLvlByteLen hdr.IsCompressed = page.compressed hdr.Statistics = page.statistics.ToThrift() pageHdr.DataPageHeaderV2 = hdr pageHdr.DataPageHeader = nil pageHdr.DictionaryPageHeader = nil } func (pw *serializedPageWriter) HasCompressor() bool { return pw.compressor != nil } func (pw *serializedPageWriter) NumValues() int64 { return pw.nvalues } func (pw *serializedPageWriter) DictionaryPageOffset() int64 { return pw.dictPageOffset } func (pw *serializedPageWriter) DataPageoffset() int64 { return pw.dataPageOffset } func (pw *serializedPageWriter) TotalCompressedSize() int64 { return pw.totalCompressed } func (pw *serializedPageWriter) TotalUncompressedSize() int64 { return pw.totalUncompressed } func (pw *serializedPageWriter) WriteDictionaryPage(page *DictionaryPage) (int64, error) { uncompressed := len(page.Data()) var data []byte if pw.HasCompressor() { var buffer bytes.Buffer data = pw.Compress(&buffer, page.Data()) // data = buffer.Bytes() } else { data = page.Data() } dictPageHeader := &format.DictionaryPageHeader{ NumValues: page.NumValues(), Encoding: page.Encoding(), IsSorted: libthrift.BoolPtr(page.IsSorted()), } if pw.dataEncryptor != nil { pw.updateEncryption(encryption.DictPageModule) pw.encryptionBuf.Reset() pw.encryptionBuf.Grow(pw.dataEncryptor.CiphertextSizeDelta() + len(data)) pw.dataEncryptor.Encrypt(&pw.encryptionBuf, data) data = pw.encryptionBuf.Bytes() } pageHdr := pageHeaderPool.Get().(*format.PageHeader) defer pageHeaderPool.Put(pageHdr) pageHdr.Type = format.PageType_DICTIONARY_PAGE pageHdr.UncompressedPageSize = int32(uncompressed) pageHdr.CompressedPageSize = int32(len(data)) pageHdr.DictionaryPageHeader = dictPageHeader pageHdr.DataPageHeader = nil pageHdr.DataPageHeaderV2 = nil startPos := pw.sink.Tell() if pw.dictPageOffset == 0 { pw.dictPageOffset = int64(startPos) } if pw.metaEncryptor != nil { if err := pw.updateEncryption(encryption.DictPageHeaderModule); err != nil { return 0, err } } headerSize, err := pw.thriftSerializer.Serialize(pageHdr, pw.sink, pw.metaEncryptor) if err != nil { return 0, err } written, err := pw.sink.Write(data) if err != nil { return 0, err } written += headerSize pw.totalUncompressed += int64(uncompressed + headerSize) pw.totalCompressed = int64(written) pw.dictEncodingStats[parquet.Encoding(page.encoding)]++ return int64(written), nil } var pageHeaderPool = sync.Pool{ New: func() interface{} { return format.NewPageHeader() }, } func (pw *serializedPageWriter) WriteDataPage(page DataPage) (int64, error) { uncompressed := page.UncompressedSize() data := page.Data() if pw.dataEncryptor != nil { if err := pw.updateEncryption(encryption.DataPageModule); err != nil { return 0, err } pw.encryptionBuf.Reset() pw.encryptionBuf.Grow(pw.dataEncryptor.CiphertextSizeDelta() + len(data)) pw.dataEncryptor.Encrypt(&pw.encryptionBuf, data) data = pw.encryptionBuf.Bytes() } pageHdr := pageHeaderPool.Get().(*format.PageHeader) defer pageHeaderPool.Put(pageHdr) pageHdr.UncompressedPageSize = uncompressed pageHdr.CompressedPageSize = int32(len(data)) switch dpage := page.(type) { case *DataPageV1: pw.setDataPageHeader(pageHdr, dpage) defer dataPageV1HeaderPool.Put(pageHdr.DataPageHeader) case *DataPageV2: pw.setDataPageV2Header(pageHdr, dpage) defer dataPageV2HeaderPool.Put(pageHdr.DataPageHeaderV2) default: return 0, xerrors.New("parquet: unexpected page type") } startPos := pw.sink.Tell() if pw.pageOrdinal == 0 { pw.dataPageOffset = int64(startPos) } if pw.metaEncryptor != nil { if err := pw.updateEncryption(encryption.DataPageHeaderModule); err != nil { return 0, err } } headerSize, err := pw.thriftSerializer.Serialize(pageHdr, pw.sink, pw.metaEncryptor) if err != nil { return 0, err } written, err := pw.sink.Write(data) if err != nil { return int64(written), err } written += headerSize pw.totalUncompressed += int64(uncompressed) + int64(headerSize) pw.totalCompressed += int64(written) pw.nvalues += int64(page.NumValues()) pw.dataEncodingStats[parquet.Encoding(page.Encoding())]++ pw.pageOrdinal++ return int64(written), nil } type bufferedPageWriter struct { finalSink utils.WriterTell inMemSink *encoding.BufferWriter metadata *metadata.ColumnChunkMetaDataBuilder pager *serializedPageWriter hasDictionaryPages bool } func newBufferedPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, columnOrdinal int16, mem memory.Allocator, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) { wr := &bufferedPageWriter{ finalSink: sink, metadata: metadata, hasDictionaryPages: false, inMemSink: encoding.NewBufferWriter(0, mem), } pager, err := createSerializedPageWriter(wr.inMemSink, codec, compressionLevel, metadata, rgOrdinal, columnOrdinal, mem, metaEncryptor, dataEncryptor) if err != nil { return nil, err } wr.pager = pager.(*serializedPageWriter) return wr, nil } func (bw *bufferedPageWriter) Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, columnOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error { bw.finalSink = sink bw.metadata = metadata bw.hasDictionaryPages = false bw.inMemSink.Reset(0) return bw.pager.Reset(bw.inMemSink, codec, compressionLevel, metadata, rgOrdinal, columnOrdinal, metaEncryptor, dataEncryptor) } func (bw *bufferedPageWriter) WriteDictionaryPage(page *DictionaryPage) (int64, error) { bw.hasDictionaryPages = true return bw.pager.WriteDictionaryPage(page) } func (bw *bufferedPageWriter) Close(hasDict, fallback bool) error { if bw.pager.metaEncryptor != nil { bw.pager.updateEncryption(encryption.ColumnMetaModule) } position := bw.finalSink.Tell() dictOffset := int64(0) if bw.hasDictionaryPages { dictOffset = bw.pager.DictionaryPageOffset() + position } chunkInfo := metadata.ChunkMetaInfo{ NumValues: bw.pager.NumValues(), DictPageOffset: dictOffset, IndexPageOffset: -1, DataPageOffset: bw.pager.DataPageoffset() + position, CompressedSize: bw.pager.TotalCompressedSize(), UncompressedSize: bw.pager.TotalUncompressedSize(), } encodingStats := metadata.EncodingStats{ DictEncodingStats: bw.pager.dictEncodingStats, DataEncodingStats: bw.pager.dataEncodingStats, } bw.metadata.Finish(chunkInfo, hasDict, fallback, encodingStats, bw.pager.metaEncryptor) bw.metadata.WriteTo(bw.inMemSink) buf := bw.inMemSink.Finish() defer buf.Release() _, err := bw.finalSink.Write(buf.Bytes()) return err } func (bw *bufferedPageWriter) WriteDataPage(page DataPage) (int64, error) { return bw.pager.WriteDataPage(page) } func (bw *bufferedPageWriter) HasCompressor() bool { return bw.pager.HasCompressor() } func (bw *bufferedPageWriter) Compress(buf *bytes.Buffer, src []byte) []byte { return bw.pager.Compress(buf, src) }