...

Source file src/github.com/apache/arrow/go/v15/parquet/file/file_writer.go

Documentation: github.com/apache/arrow/go/v15/parquet/file

     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package file
    18  
    19  import (
    20  	"encoding/binary"
    21  	"fmt"
    22  	"io"
    23  
    24  	"github.com/apache/arrow/go/v15/parquet"
    25  	"github.com/apache/arrow/go/v15/parquet/internal/encryption"
    26  	"github.com/apache/arrow/go/v15/parquet/internal/utils"
    27  	"github.com/apache/arrow/go/v15/parquet/metadata"
    28  	"github.com/apache/arrow/go/v15/parquet/schema"
    29  )
    30  
    31  // Writer is the primary interface for writing a parquet file
    32  type Writer struct {
    33  	sink           utils.WriteCloserTell
    34  	open           bool
    35  	props          *parquet.WriterProperties
    36  	rowGroups      int
    37  	nrows          int
    38  	metadata       metadata.FileMetaDataBuilder
    39  	fileEncryptor  encryption.FileEncryptor
    40  	rowGroupWriter *rowGroupWriter
    41  
    42  	// The Schema of this writer
    43  	Schema *schema.Schema
    44  }
    45  
    46  type writerConfig struct {
    47  	props            *parquet.WriterProperties
    48  	keyValueMetadata metadata.KeyValueMetadata
    49  }
    50  
    51  type WriteOption func(*writerConfig)
    52  
    53  func WithWriterProps(props *parquet.WriterProperties) WriteOption {
    54  	return func(c *writerConfig) {
    55  		c.props = props
    56  	}
    57  }
    58  
    59  func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption {
    60  	return func(c *writerConfig) {
    61  		c.keyValueMetadata = meta
    62  	}
    63  }
    64  
    65  // NewParquetWriter returns a Writer that writes to the provided WriteSeeker with the given schema.
    66  //
    67  // If props is nil, then the default Writer Properties will be used. If the key value metadata is not nil,
    68  // it will be added to the file.
    69  func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *Writer {
    70  	config := &writerConfig{}
    71  	for _, o := range opts {
    72  		o(config)
    73  	}
    74  	if config.props == nil {
    75  		config.props = parquet.NewWriterProperties()
    76  	}
    77  
    78  	fileSchema := schema.NewSchema(sc)
    79  	fw := &Writer{
    80  		props:  config.props,
    81  		sink:   &utils.TellWrapper{Writer: w},
    82  		open:   true,
    83  		Schema: fileSchema,
    84  	}
    85  
    86  	fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, config.keyValueMetadata)
    87  	fw.startFile()
    88  	return fw
    89  }
    90  
    91  // NumColumns returns the number of columns to write as defined by the schema.
    92  func (fw *Writer) NumColumns() int { return fw.Schema.NumColumns() }
    93  
    94  // NumRowGroups returns the current number of row groups that will be written for this file.
    95  func (fw *Writer) NumRowGroups() int { return fw.rowGroups }
    96  
    97  // NumRows returns the current number of rows that have be written
    98  func (fw *Writer) NumRows() int { return fw.nrows }
    99  
   100  // Properties returns the writer properties that are in use for this file.
   101  func (fw *Writer) Properties() *parquet.WriterProperties { return fw.props }
   102  
   103  // AppendBufferedRowGroup appends a rowgroup to the file and returns a writer
   104  // that buffers the row group in memory allowing writing multiple columns
   105  // at once to the row group. Data is not flushed out until the row group
   106  // is closed.
   107  //
   108  // When calling Close, all columns must have the same number of rows written.
   109  func (fw *Writer) AppendBufferedRowGroup() BufferedRowGroupWriter {
   110  	return fw.appendRowGroup(true)
   111  }
   112  
   113  // AppendRowGroup appends a row group to the file and returns a writer
   114  // that writes columns to the row group in serial via calling NextColumn.
   115  //
   116  // When calling NextColumn, the same number of rows need to have been written
   117  // to each column before moving on. Otherwise the rowgroup writer will panic.
   118  func (fw *Writer) AppendRowGroup() SerialRowGroupWriter {
   119  	return fw.appendRowGroup(false)
   120  }
   121  
   122  func (fw *Writer) appendRowGroup(buffered bool) *rowGroupWriter {
   123  	if fw.rowGroupWriter != nil {
   124  		fw.nrows += fw.rowGroupWriter.nrows
   125  		fw.rowGroupWriter.Close()
   126  	}
   127  	fw.rowGroups++
   128  	rgMeta := fw.metadata.AppendRowGroup()
   129  	fw.rowGroupWriter = newRowGroupWriter(fw.sink, rgMeta, int16(fw.rowGroups)-1, fw.props, buffered, fw.fileEncryptor)
   130  	return fw.rowGroupWriter
   131  }
   132  
   133  func (fw *Writer) startFile() {
   134  	encryptionProps := fw.props.FileEncryptionProperties()
   135  	magic := magicBytes
   136  	if encryptionProps != nil {
   137  		// check that all columns in columnEncryptionProperties exist in the schema
   138  		encryptedCols := encryptionProps.EncryptedColumns()
   139  		// if columnEncryptionProperties is empty, every column in the file schema will be encrypted with the footer key
   140  		if len(encryptedCols) != 0 {
   141  			colPaths := make(map[string]bool)
   142  			for i := 0; i < fw.Schema.NumColumns(); i++ {
   143  				colPaths[fw.Schema.Column(i).Path()] = true
   144  			}
   145  			for k := range encryptedCols {
   146  				if _, ok := colPaths[k]; !ok {
   147  					panic("encrypted column " + k + " not found in file schema")
   148  				}
   149  			}
   150  		}
   151  
   152  		fw.fileEncryptor = encryption.NewFileEncryptor(encryptionProps, fw.props.Allocator())
   153  		if encryptionProps.EncryptedFooter() {
   154  			magic = magicEBytes
   155  		}
   156  	}
   157  	n, err := fw.sink.Write(magic)
   158  	if n != 4 || err != nil {
   159  		panic("failed to write magic number")
   160  	}
   161  }
   162  
   163  // AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
   164  func (fw *Writer) AppendKeyValueMetadata(key string, value string) error {
   165  	return fw.metadata.AppendKeyValueMetadata(key, value)
   166  }
   167  
   168  // Close closes any open row group writer and writes the file footer. Subsequent
   169  // calls to close will have no effect.
   170  func (fw *Writer) Close() (err error) {
   171  	if fw.open {
   172  		// if any functions here panic, we set open to be false so
   173  		// that this doesn't get called again
   174  		fw.open = false
   175  		if fw.rowGroupWriter != nil {
   176  			fw.nrows += fw.rowGroupWriter.nrows
   177  			fw.rowGroupWriter.Close()
   178  		}
   179  		fw.rowGroupWriter = nil
   180  		defer func() {
   181  			ierr := fw.sink.Close()
   182  			if err != nil {
   183  				if ierr != nil {
   184  					err = fmt.Errorf("error on close:%w, %s", err, ierr)
   185  				}
   186  				return
   187  			}
   188  
   189  			err = ierr
   190  		}()
   191  
   192  		fileEncryptProps := fw.props.FileEncryptionProperties()
   193  		if fileEncryptProps == nil { // non encrypted file
   194  			fileMetadata, err := fw.metadata.Finish()
   195  			if err != nil {
   196  				return err
   197  			}
   198  
   199  			_, err = writeFileMetadata(fileMetadata, fw.sink)
   200  			return err
   201  		}
   202  
   203  		return fw.closeEncryptedFile(fileEncryptProps)
   204  	}
   205  	return nil
   206  }
   207  
   208  func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) error {
   209  	// encrypted file with encrypted footer
   210  	if props.EncryptedFooter() {
   211  		fileMetadata, err := fw.metadata.Finish()
   212  		if err != nil {
   213  			return err
   214  		}
   215  
   216  		footerLen := int64(0)
   217  
   218  		cryptoMetadata := fw.metadata.GetFileCryptoMetaData()
   219  		n, err := writeFileCryptoMetadata(cryptoMetadata, fw.sink)
   220  		if err != nil {
   221  			return err
   222  		}
   223  
   224  		footerLen += n
   225  		footerEncryptor := fw.fileEncryptor.GetFooterEncryptor()
   226  		n, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerEncryptor, true)
   227  		if err != nil {
   228  			return err
   229  		}
   230  		footerLen += n
   231  
   232  		if err = binary.Write(fw.sink, binary.LittleEndian, uint32(footerLen)); err != nil {
   233  			return err
   234  		}
   235  		if _, err = fw.sink.Write(magicEBytes); err != nil {
   236  			return err
   237  		}
   238  	} else {
   239  		fileMetadata, err := fw.metadata.Finish()
   240  		if err != nil {
   241  			return err
   242  		}
   243  		footerSigningEncryptor := fw.fileEncryptor.GetFooterSigningEncryptor()
   244  		if _, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerSigningEncryptor, false); err != nil {
   245  			return err
   246  		}
   247  	}
   248  	if fw.fileEncryptor != nil {
   249  		fw.fileEncryptor.WipeOutEncryptionKeys()
   250  	}
   251  	return nil
   252  }
   253  
   254  func writeFileMetadata(fileMetadata *metadata.FileMetaData, w io.Writer) (n int64, err error) {
   255  	n, err = fileMetadata.WriteTo(w, nil)
   256  	if err != nil {
   257  		return
   258  	}
   259  
   260  	if err = binary.Write(w, binary.LittleEndian, uint32(n)); err != nil {
   261  		return
   262  	}
   263  	if _, err = w.Write(magicBytes); err != nil {
   264  		return
   265  	}
   266  	return n + int64(4+len(magicBytes)), nil
   267  }
   268  
   269  func writeEncryptedFileMetadata(fileMetadata *metadata.FileMetaData, w io.Writer, encryptor encryption.Encryptor, encryptFooter bool) (n int64, err error) {
   270  	n, err = fileMetadata.WriteTo(w, encryptor)
   271  	if encryptFooter {
   272  		return
   273  	}
   274  	if err != nil {
   275  		return
   276  	}
   277  	if err = binary.Write(w, binary.LittleEndian, uint32(n)); err != nil {
   278  		return
   279  	}
   280  	if _, err = w.Write(magicBytes); err != nil {
   281  		return
   282  	}
   283  	return n + int64(4+len(magicBytes)), nil
   284  }
   285  
   286  func writeFileCryptoMetadata(crypto *metadata.FileCryptoMetadata, w io.Writer) (int64, error) {
   287  	return crypto.WriteTo(w)
   288  }
   289  

View as plain text