...

Source file src/github.com/apache/arrow/go/v15/parquet/pqarrow/file_writer.go

Documentation: github.com/apache/arrow/go/v15/parquet/pqarrow

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package pqarrow
    18  
    19  import (
    20  	"context"
    21  	"encoding/base64"
    22  	"fmt"
    23  	"io"
    24  
    25  	"github.com/apache/arrow/go/v15/arrow"
    26  	"github.com/apache/arrow/go/v15/arrow/flight"
    27  	"github.com/apache/arrow/go/v15/internal/utils"
    28  	"github.com/apache/arrow/go/v15/parquet"
    29  	"github.com/apache/arrow/go/v15/parquet/file"
    30  	"github.com/apache/arrow/go/v15/parquet/metadata"
    31  	"golang.org/x/xerrors"
    32  )
    33  
    34  // WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema
    35  // and columns will be determined by the schema of the table, writing the file out to the provided writer.
    36  // The chunksize will be utilized in order to determine the size of the row groups.
    37  func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props *parquet.WriterProperties, arrprops ArrowWriterProperties) error {
    38  	writer, err := NewFileWriter(tbl.Schema(), w, props, arrprops)
    39  	if err != nil {
    40  		return err
    41  	}
    42  
    43  	if err := writer.WriteTable(tbl, chunkSize); err != nil {
    44  		return err
    45  	}
    46  
    47  	return writer.Close()
    48  }
    49  
    50  // FileWriter is an object for writing Arrow directly to a parquet file.
    51  type FileWriter struct {
    52  	wr         *file.Writer
    53  	schema     *arrow.Schema
    54  	manifest   *SchemaManifest
    55  	rgw        file.RowGroupWriter
    56  	arrowProps ArrowWriterProperties
    57  	ctx        context.Context
    58  	colIdx     int
    59  	closed     bool
    60  }
    61  
    62  // NewFileWriter returns a writer for writing Arrow directly to a parquetfile, rather than
    63  // the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing
    64  // file.Writer, this will create a new file.Writer based on the schema provided.
    65  func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) {
    66  	if props == nil {
    67  		props = parquet.NewWriterProperties()
    68  	}
    69  
    70  	pqschema, err := ToParquet(arrschema, props, arrprops)
    71  	if err != nil {
    72  		return nil, err
    73  	}
    74  
    75  	meta := make(metadata.KeyValueMetadata, 0)
    76  	for i := 0; i < arrschema.Metadata().Len(); i++ {
    77  		meta.Append(arrschema.Metadata().Keys()[i], arrschema.Metadata().Values()[i])
    78  	}
    79  
    80  	if arrprops.storeSchema {
    81  		serializedSchema := flight.SerializeSchema(arrschema, props.Allocator())
    82  		meta.Append("ARROW:schema", base64.StdEncoding.EncodeToString(serializedSchema))
    83  	}
    84  
    85  	schemaNode := pqschema.Root()
    86  	baseWriter := file.NewParquetWriter(w, schemaNode, file.WithWriterProps(props), file.WithWriteMetadata(meta))
    87  
    88  	manifest, err := NewSchemaManifest(pqschema, nil, &ArrowReadProperties{})
    89  	if err != nil {
    90  		return nil, err
    91  	}
    92  
    93  	return &FileWriter{wr: baseWriter, schema: arrschema, manifest: manifest, arrowProps: arrprops, ctx: NewArrowWriteContext(context.TODO(), &arrprops)}, nil
    94  }
    95  
    96  // NewRowGroup does what it says on the tin, creates a new row group in the underlying file.
    97  // Equivalent to `AppendRowGroup` on a file.Writer
    98  func (fw *FileWriter) NewRowGroup() {
    99  	if fw.rgw != nil {
   100  		fw.rgw.Close()
   101  	}
   102  	fw.rgw = fw.wr.AppendRowGroup()
   103  	fw.colIdx = 0
   104  }
   105  
   106  // NewBufferedRowGroup starts a new memory Buffered Row Group to allow writing columns / records
   107  // without immediately flushing them to disk. This allows using WriteBuffered to write records
   108  // and decide where to break your row group based on the TotalBytesWritten rather than on the max
   109  // row group len. If using Records, this should be paired with WriteBuffered, while
   110  // Write will always write a new record as a row group in and of itself.
   111  func (fw *FileWriter) NewBufferedRowGroup() {
   112  	if fw.rgw != nil {
   113  		fw.rgw.Close()
   114  	}
   115  	fw.rgw = fw.wr.AppendBufferedRowGroup()
   116  	fw.colIdx = 0
   117  }
   118  
   119  // RowGroupTotalCompressedBytes returns the total number of bytes after compression
   120  // that have been written to the current row group so far.
   121  func (fw *FileWriter) RowGroupTotalCompressedBytes() int64 {
   122  	if fw.rgw != nil {
   123  		return fw.rgw.TotalCompressedBytes()
   124  	}
   125  	return 0
   126  }
   127  
   128  // RowGroupTotalBytesWritten returns the total number of bytes written and flushed out in
   129  // the current row group.
   130  func (fw *FileWriter) RowGroupTotalBytesWritten() int64 {
   131  	if fw.rgw != nil {
   132  		return fw.rgw.TotalBytesWritten()
   133  	}
   134  	return 0
   135  }
   136  
   137  // RowGroupNumRows returns the number of rows written to the current row group.
   138  // Returns an error if they are unequal between columns that have been written so far.
   139  func (fw *FileWriter) RowGroupNumRows() (int, error) {
   140  	if fw.rgw != nil {
   141  		return fw.rgw.NumRows()
   142  	}
   143  	return 0, nil
   144  }
   145  
   146  // NumRows returns the total number of rows that have been written so far.
   147  func (fw *FileWriter) NumRows() int {
   148  	if fw.wr != nil {
   149  		return fw.wr.NumRows()
   150  	}
   151  	return 0
   152  }
   153  
   154  // WriteBuffered will either append to an existing row group or create a new one
   155  // based on the record length and max row group length.
   156  //
   157  // Additionally, it allows to manually break your row group by
   158  // checking RowGroupTotalBytesWritten and calling NewBufferedRowGroup,
   159  // while Write will always create at least 1 row group for the record.
   160  //
   161  // Performance-wise WriteBuffered might be more favorable than Write if you're dealing with:
   162  // * a loose memory environment (meaning you have a lot of memory to utilize)
   163  // * records that have only a small (~<1K?) amount of rows
   164  //
   165  // More memory is utilized compared to Write as the whole row group data is kept in memory before it's written
   166  // since Parquet files must have an entire column written before writing the next column.
   167  func (fw *FileWriter) WriteBuffered(rec arrow.Record) error {
   168  	if !rec.Schema().Equal(fw.schema) {
   169  		return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema)
   170  	}
   171  
   172  	var (
   173  		recList []arrow.Record
   174  		maxRows = fw.wr.Properties().MaxRowGroupLength()
   175  		curRows int
   176  		err     error
   177  	)
   178  	if fw.rgw != nil {
   179  		if curRows, err = fw.rgw.NumRows(); err != nil {
   180  			return err
   181  		}
   182  	} else {
   183  		fw.NewBufferedRowGroup()
   184  	}
   185  
   186  	if int64(curRows)+rec.NumRows() <= maxRows {
   187  		recList = []arrow.Record{rec}
   188  	} else {
   189  		recList = []arrow.Record{rec.NewSlice(0, maxRows-int64(curRows))}
   190  		defer recList[0].Release()
   191  		for offset := maxRows - int64(curRows); offset < rec.NumRows(); offset += maxRows {
   192  			s := rec.NewSlice(offset, offset+utils.Min(maxRows, rec.NumRows()-offset))
   193  			defer s.Release()
   194  			recList = append(recList, s)
   195  		}
   196  	}
   197  
   198  	for idx, r := range recList {
   199  		if idx > 0 {
   200  			fw.NewBufferedRowGroup()
   201  		}
   202  		for i := 0; i < int(r.NumCols()); i++ {
   203  			if err := fw.WriteColumnData(r.Column(i)); err != nil {
   204  				fw.Close()
   205  				return err
   206  			}
   207  		}
   208  	}
   209  	fw.colIdx = 0
   210  	return nil
   211  }
   212  
   213  // Write an arrow Record Batch to the file, respecting the MaxRowGroupLength in the writer
   214  // properties to determine whether the record is broken up into more than one row group.
   215  // At the very least a single row group is created per record,
   216  // so calling Write always results in a new row group added.
   217  //
   218  // Performance-wise Write might be more favorable than WriteBuffered if you're dealing with:
   219  // * a highly-restricted memory environment
   220  // * very large records with lots of rows (potentially close to the max row group length)
   221  func (fw *FileWriter) Write(rec arrow.Record) error {
   222  	if !rec.Schema().Equal(fw.schema) {
   223  		return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema)
   224  	}
   225  
   226  	var recList []arrow.Record
   227  	rowgroupLen := fw.wr.Properties().MaxRowGroupLength()
   228  	if rec.NumRows() > rowgroupLen {
   229  		recList = make([]arrow.Record, 0)
   230  		for offset := int64(0); offset < rec.NumRows(); offset += rowgroupLen {
   231  			s := rec.NewSlice(offset, offset+utils.Min(rowgroupLen, rec.NumRows()-offset))
   232  			defer s.Release()
   233  			recList = append(recList, s)
   234  		}
   235  	} else {
   236  		recList = []arrow.Record{rec}
   237  	}
   238  
   239  	for _, r := range recList {
   240  		fw.NewRowGroup()
   241  		for i := 0; i < int(r.NumCols()); i++ {
   242  			if err := fw.WriteColumnData(r.Column(i)); err != nil {
   243  				fw.Close()
   244  				return err
   245  			}
   246  		}
   247  	}
   248  	fw.colIdx = 0
   249  	return nil
   250  }
   251  
   252  // WriteTable writes an arrow table to the underlying file using chunkSize to determine
   253  // the size to break at for making row groups. Writing a table will always create a new
   254  // row group for each chunk of chunkSize rows in the table. Calling this with 0 rows will
   255  // still write a 0 length Row Group to the file.
   256  func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error {
   257  	if chunkSize <= 0 && tbl.NumRows() > 0 {
   258  		return xerrors.New("chunk size per row group must be greater than 0")
   259  	} else if !tbl.Schema().Equal(fw.schema) {
   260  		return fmt.Errorf("table schema does not match writer's. \nTable: %s\n writer: %s", tbl.Schema(), fw.schema)
   261  	} else if chunkSize > fw.wr.Properties().MaxRowGroupLength() {
   262  		chunkSize = fw.wr.Properties().MaxRowGroupLength()
   263  	}
   264  
   265  	writeRowGroup := func(offset, size int64) error {
   266  		fw.NewRowGroup()
   267  		for i := 0; i < int(tbl.NumCols()); i++ {
   268  			if err := fw.WriteColumnChunked(tbl.Column(i).Data(), offset, size); err != nil {
   269  				return err
   270  			}
   271  		}
   272  		return nil
   273  	}
   274  
   275  	if tbl.NumRows() == 0 {
   276  		if err := writeRowGroup(0, 0); err != nil {
   277  			fw.Close()
   278  			return err
   279  		}
   280  		return nil
   281  	}
   282  
   283  	for offset := int64(0); offset < tbl.NumRows(); offset += chunkSize {
   284  		if err := writeRowGroup(offset, utils.Min(chunkSize, tbl.NumRows()-offset)); err != nil {
   285  			fw.Close()
   286  			return err
   287  		}
   288  	}
   289  	return nil
   290  }
   291  
   292  // AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
   293  func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error {
   294  	return fw.wr.AppendKeyValueMetadata(key, value)
   295  }
   296  
   297  // Close flushes out the data and closes the file. It can be called multiple times,
   298  // subsequent calls after the first will have no effect.
   299  func (fw *FileWriter) Close() error {
   300  	if !fw.closed {
   301  		fw.closed = true
   302  		if fw.rgw != nil {
   303  			if err := fw.rgw.Close(); err != nil {
   304  				return err
   305  			}
   306  		}
   307  
   308  		writeCtx := arrowCtxFromContext(fw.ctx)
   309  		if writeCtx.dataBuffer != nil {
   310  			writeCtx.dataBuffer.Release()
   311  			writeCtx.dataBuffer = nil
   312  		}
   313  
   314  		return fw.wr.Close()
   315  	}
   316  	return nil
   317  }
   318  
   319  // WriteColumnChunked will write the data provided to the underlying file, using the provided
   320  // offset and size to allow writing subsets of data from the chunked column. It uses the current
   321  // column in the underlying row group writer as the starting point, allowing progressive
   322  // building of writing columns to a file via arrow data without needing to already have
   323  // a record or table.
   324  func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error {
   325  	acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
   326  	if err != nil {
   327  		return err
   328  	}
   329  	fw.colIdx += acw.leafCount
   330  	return acw.Write(fw.ctx)
   331  }
   332  
   333  // WriteColumnData writes the entire array to the file as the next columns. Like WriteColumnChunked
   334  // it is based on the current column of the row group writer allowing progressive building
   335  // of the file by columns without needing a full record or table to write.
   336  func (fw *FileWriter) WriteColumnData(data arrow.Array) error {
   337  	chunked := arrow.NewChunked(data.DataType(), []arrow.Array{data})
   338  	defer chunked.Release()
   339  	return fw.WriteColumnChunked(chunked, 0, int64(data.Len()))
   340  }
   341  

View as plain text