// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pqarrow import ( "context" "encoding/base64" "fmt" "io" "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/flight" "github.com/apache/arrow/go/v15/internal/utils" "github.com/apache/arrow/go/v15/parquet" "github.com/apache/arrow/go/v15/parquet/file" "github.com/apache/arrow/go/v15/parquet/metadata" "golang.org/x/xerrors" ) // WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema // and columns will be determined by the schema of the table, writing the file out to the provided writer. // The chunksize will be utilized in order to determine the size of the row groups. func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props *parquet.WriterProperties, arrprops ArrowWriterProperties) error { writer, err := NewFileWriter(tbl.Schema(), w, props, arrprops) if err != nil { return err } if err := writer.WriteTable(tbl, chunkSize); err != nil { return err } return writer.Close() } // FileWriter is an object for writing Arrow directly to a parquet file. type FileWriter struct { wr *file.Writer schema *arrow.Schema manifest *SchemaManifest rgw file.RowGroupWriter arrowProps ArrowWriterProperties ctx context.Context colIdx int closed bool } // NewFileWriter returns a writer for writing Arrow directly to a parquetfile, rather than // the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing // file.Writer, this will create a new file.Writer based on the schema provided. func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) { if props == nil { props = parquet.NewWriterProperties() } pqschema, err := ToParquet(arrschema, props, arrprops) if err != nil { return nil, err } meta := make(metadata.KeyValueMetadata, 0) for i := 0; i < arrschema.Metadata().Len(); i++ { meta.Append(arrschema.Metadata().Keys()[i], arrschema.Metadata().Values()[i]) } if arrprops.storeSchema { serializedSchema := flight.SerializeSchema(arrschema, props.Allocator()) meta.Append("ARROW:schema", base64.StdEncoding.EncodeToString(serializedSchema)) } schemaNode := pqschema.Root() baseWriter := file.NewParquetWriter(w, schemaNode, file.WithWriterProps(props), file.WithWriteMetadata(meta)) manifest, err := NewSchemaManifest(pqschema, nil, &ArrowReadProperties{}) if err != nil { return nil, err } return &FileWriter{wr: baseWriter, schema: arrschema, manifest: manifest, arrowProps: arrprops, ctx: NewArrowWriteContext(context.TODO(), &arrprops)}, nil } // NewRowGroup does what it says on the tin, creates a new row group in the underlying file. // Equivalent to `AppendRowGroup` on a file.Writer func (fw *FileWriter) NewRowGroup() { if fw.rgw != nil { fw.rgw.Close() } fw.rgw = fw.wr.AppendRowGroup() fw.colIdx = 0 } // NewBufferedRowGroup starts a new memory Buffered Row Group to allow writing columns / records // without immediately flushing them to disk. This allows using WriteBuffered to write records // and decide where to break your row group based on the TotalBytesWritten rather than on the max // row group len. If using Records, this should be paired with WriteBuffered, while // Write will always write a new record as a row group in and of itself. func (fw *FileWriter) NewBufferedRowGroup() { if fw.rgw != nil { fw.rgw.Close() } fw.rgw = fw.wr.AppendBufferedRowGroup() fw.colIdx = 0 } // RowGroupTotalCompressedBytes returns the total number of bytes after compression // that have been written to the current row group so far. func (fw *FileWriter) RowGroupTotalCompressedBytes() int64 { if fw.rgw != nil { return fw.rgw.TotalCompressedBytes() } return 0 } // RowGroupTotalBytesWritten returns the total number of bytes written and flushed out in // the current row group. func (fw *FileWriter) RowGroupTotalBytesWritten() int64 { if fw.rgw != nil { return fw.rgw.TotalBytesWritten() } return 0 } // RowGroupNumRows returns the number of rows written to the current row group. // Returns an error if they are unequal between columns that have been written so far. func (fw *FileWriter) RowGroupNumRows() (int, error) { if fw.rgw != nil { return fw.rgw.NumRows() } return 0, nil } // NumRows returns the total number of rows that have been written so far. func (fw *FileWriter) NumRows() int { if fw.wr != nil { return fw.wr.NumRows() } return 0 } // WriteBuffered will either append to an existing row group or create a new one // based on the record length and max row group length. // // Additionally, it allows to manually break your row group by // checking RowGroupTotalBytesWritten and calling NewBufferedRowGroup, // while Write will always create at least 1 row group for the record. // // Performance-wise WriteBuffered might be more favorable than Write if you're dealing with: // * a loose memory environment (meaning you have a lot of memory to utilize) // * records that have only a small (~<1K?) amount of rows // // More memory is utilized compared to Write as the whole row group data is kept in memory before it's written // since Parquet files must have an entire column written before writing the next column. func (fw *FileWriter) WriteBuffered(rec arrow.Record) error { if !rec.Schema().Equal(fw.schema) { return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema) } var ( recList []arrow.Record maxRows = fw.wr.Properties().MaxRowGroupLength() curRows int err error ) if fw.rgw != nil { if curRows, err = fw.rgw.NumRows(); err != nil { return err } } else { fw.NewBufferedRowGroup() } if int64(curRows)+rec.NumRows() <= maxRows { recList = []arrow.Record{rec} } else { recList = []arrow.Record{rec.NewSlice(0, maxRows-int64(curRows))} defer recList[0].Release() for offset := maxRows - int64(curRows); offset < rec.NumRows(); offset += maxRows { s := rec.NewSlice(offset, offset+utils.Min(maxRows, rec.NumRows()-offset)) defer s.Release() recList = append(recList, s) } } for idx, r := range recList { if idx > 0 { fw.NewBufferedRowGroup() } for i := 0; i < int(r.NumCols()); i++ { if err := fw.WriteColumnData(r.Column(i)); err != nil { fw.Close() return err } } } fw.colIdx = 0 return nil } // Write an arrow Record Batch to the file, respecting the MaxRowGroupLength in the writer // properties to determine whether the record is broken up into more than one row group. // At the very least a single row group is created per record, // so calling Write always results in a new row group added. // // Performance-wise Write might be more favorable than WriteBuffered if you're dealing with: // * a highly-restricted memory environment // * very large records with lots of rows (potentially close to the max row group length) func (fw *FileWriter) Write(rec arrow.Record) error { if !rec.Schema().Equal(fw.schema) { return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema) } var recList []arrow.Record rowgroupLen := fw.wr.Properties().MaxRowGroupLength() if rec.NumRows() > rowgroupLen { recList = make([]arrow.Record, 0) for offset := int64(0); offset < rec.NumRows(); offset += rowgroupLen { s := rec.NewSlice(offset, offset+utils.Min(rowgroupLen, rec.NumRows()-offset)) defer s.Release() recList = append(recList, s) } } else { recList = []arrow.Record{rec} } for _, r := range recList { fw.NewRowGroup() for i := 0; i < int(r.NumCols()); i++ { if err := fw.WriteColumnData(r.Column(i)); err != nil { fw.Close() return err } } } fw.colIdx = 0 return nil } // WriteTable writes an arrow table to the underlying file using chunkSize to determine // the size to break at for making row groups. Writing a table will always create a new // row group for each chunk of chunkSize rows in the table. Calling this with 0 rows will // still write a 0 length Row Group to the file. func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error { if chunkSize <= 0 && tbl.NumRows() > 0 { return xerrors.New("chunk size per row group must be greater than 0") } else if !tbl.Schema().Equal(fw.schema) { return fmt.Errorf("table schema does not match writer's. \nTable: %s\n writer: %s", tbl.Schema(), fw.schema) } else if chunkSize > fw.wr.Properties().MaxRowGroupLength() { chunkSize = fw.wr.Properties().MaxRowGroupLength() } writeRowGroup := func(offset, size int64) error { fw.NewRowGroup() for i := 0; i < int(tbl.NumCols()); i++ { if err := fw.WriteColumnChunked(tbl.Column(i).Data(), offset, size); err != nil { return err } } return nil } if tbl.NumRows() == 0 { if err := writeRowGroup(0, 0); err != nil { fw.Close() return err } return nil } for offset := int64(0); offset < tbl.NumRows(); offset += chunkSize { if err := writeRowGroup(offset, utils.Min(chunkSize, tbl.NumRows()-offset)); err != nil { fw.Close() return err } } return nil } // AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error { return fw.wr.AppendKeyValueMetadata(key, value) } // Close flushes out the data and closes the file. It can be called multiple times, // subsequent calls after the first will have no effect. func (fw *FileWriter) Close() error { if !fw.closed { fw.closed = true if fw.rgw != nil { if err := fw.rgw.Close(); err != nil { return err } } writeCtx := arrowCtxFromContext(fw.ctx) if writeCtx.dataBuffer != nil { writeCtx.dataBuffer.Release() writeCtx.dataBuffer = nil } return fw.wr.Close() } return nil } // WriteColumnChunked will write the data provided to the underlying file, using the provided // offset and size to allow writing subsets of data from the chunked column. It uses the current // column in the underlying row group writer as the starting point, allowing progressive // building of writing columns to a file via arrow data without needing to already have // a record or table. func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error { acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx) if err != nil { return err } fw.colIdx += acw.leafCount return acw.Write(fw.ctx) } // WriteColumnData writes the entire array to the file as the next columns. Like WriteColumnChunked // it is based on the current column of the row group writer allowing progressive building // of the file by columns without needing a full record or table to write. func (fw *FileWriter) WriteColumnData(data arrow.Array) error { chunked := arrow.NewChunked(data.DataType(), []arrow.Array{data}) defer chunked.Release() return fw.WriteColumnChunked(chunked, 0, int64(data.Len())) }