     1  // Licensed to the Apache Software Foundation (ASF) under one
     2  // or more contributor license agreements.  See the NOTICE file
     3  // distributed with this work for additional information
     4  // regarding copyright ownership.  The ASF licenses this file
     5  // to you under the Apache License, Version 2.0 (the
     6  // "License"); you may not use this file except in compliance
     7  // with the License.  You may obtain a copy of the License at
     8  //
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    17  package pqarrow
    19  import (
    20  	"context"
    21  	"encoding/binary"
    22  	"errors"
    23  	"fmt"
    24  	"math"
    25  	"time"
    26  	"unsafe"
    28  	"github.com/apache/arrow/go/v15/arrow"
    29  	"github.com/apache/arrow/go/v15/arrow/array"
    30  	"github.com/apache/arrow/go/v15/arrow/bitutil"
    31  	"github.com/apache/arrow/go/v15/arrow/decimal128"
    32  	"github.com/apache/arrow/go/v15/arrow/decimal256"
    33  	"github.com/apache/arrow/go/v15/arrow/memory"
    34  	"github.com/apache/arrow/go/v15/internal/utils"
    35  	"github.com/apache/arrow/go/v15/parquet"
    36  	"github.com/apache/arrow/go/v15/parquet/file"
    37  	"github.com/apache/arrow/go/v15/parquet/internal/debug"
    38  )
    40  // get the count of the number of leaf arrays for the type
    41  func calcLeafCount(dt arrow.DataType) int {
    42  	switch dt := dt.(type) {
    43  	case arrow.ExtensionType:
    44  		return calcLeafCount(dt.StorageType())
    45  	case arrow.NestedType:
    46  		nleaves := 0
    47  		for _, f := range dt.Fields() {
    48  			nleaves += calcLeafCount(f.Type)
    49  		}
    50  		return nleaves
    51  	case *arrow.DictionaryType:
    52  		return calcLeafCount(dt.ValueType)
    53  	default:
    54  		return 1
    55  	}
    56  }
    58  func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool {
    59  	curField := field
    60  	nullable := field.Field.Nullable
    61  	for curField != nil {
    62  		nullable = curField.Field.Nullable
    63  		curField = manifest.GetParent(curField)
    64  	}
    65  	return nullable
    66  }
    68  // arrowColumnWriter is a convenience object for easily writing arrow data to a specific
    69  // set of columns in a parquet file. Since a single arrow array can itself be a nested type
    70  // consisting of multiple columns of data, this will write to all of the appropriate leaves in
    71  // the parquet file, allowing easy writing of nested columns.
    72  type arrowColumnWriter struct {
    73  	builders  []*multipathLevelBuilder
    74  	leafCount int
    75  	colIdx    int
    76  	rgw       file.RowGroupWriter
    77  }
    79  // newArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
    80  // and the provided schema manifest to determine the paths for writing the columns.
    81  //
    82  // Using an arrow column writer is a convenience to avoid having to process the arrow array yourself
    83  // and determine the correct definition and repetition levels manually.
    84  func newArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (arrowColumnWriter, error) {
    85  	if data.Len() == 0 {
    86  		return arrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
    87  	}
    89  	var (
    90  		absPos      int64
    91  		chunkOffset int64
    92  		chunkIdx    int
    93  		values      int64
    94  	)
    96  	for idx, chnk := range data.Chunks() {
    97  		chunkIdx = idx
    98  		if absPos >= offset {
    99  			break
   100  		}
   102  		chunkLen := int64(chnk.Len())
   103  		if absPos+chunkLen > offset {
   104  			chunkOffset = offset - absPos
   105  			break
   106  		}
   108  		absPos += chunkLen
   109  	}
   111  	if absPos >= int64(data.Len()) {
   112  		return arrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
   113  	}
   115  	leafCount := calcLeafCount(data.DataType())
   116  	isNullable := false
   117  	// row group writer hasn't been advanced yet so add 1 to the current
   118  	// which is the one this instance will start writing for
   119  	// colIdx := rgw.CurrentColumn() + 1
   121  	schemaField, err := manifest.GetColumnField(leafColIdx)
   122  	if err != nil {
   123  		return arrowColumnWriter{}, err
   124  	}
   125  	isNullable = nullableRoot(manifest, schemaField)
   127  	builders := make([]*multipathLevelBuilder, 0)
   128  	for values < size {
   129  		chunk := data.Chunk(chunkIdx)
   130  		available := int64(chunk.Len() - int(chunkOffset))
   131  		chunkWriteSize := utils.Min(size-values, available)
   133  		// the chunk offset will be 0 here except for possibly the first chunk
   134  		// because of the above advancing logic
   135  		arrToWrite := array.NewSlice(chunk, chunkOffset, chunkOffset+chunkWriteSize)
   136  		defer arrToWrite.Release()
   138  		if arrToWrite.Len() > 0 {
   139  			bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable)
   140  			if err != nil {
   141  				return arrowColumnWriter{}, nil
   142  			}
   143  			if leafCount != bldr.leafCount() {
   144  				return arrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leaf_count: %d - %d", leafCount, bldr.leafCount())
   145  			}
   146  			builders = append(builders, bldr)
   147  		}
   149  		if chunkWriteSize == available {
   150  			chunkOffset = 0
   151  			chunkIdx++
   152  		}
   153  		values += chunkWriteSize
   154  	}
   156  	return arrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
   157  }
   159  func (acw *arrowColumnWriter) Write(ctx context.Context) error {
   160  	arrCtx := arrowCtxFromContext(ctx)
   161  	for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ {
   162  		var (
   163  			cw  file.ColumnChunkWriter
   164  			err error
   165  		)
   167  		if acw.rgw.Buffered() {
   168  			cw, err = acw.rgw.(file.BufferedRowGroupWriter).Column(acw.colIdx + leafIdx)
   169  		} else {
   170  			cw, err = acw.rgw.(file.SerialRowGroupWriter).NextColumn()
   171  		}
   173  		if err != nil {
   174  			return err
   175  		}
   177  		for _, bldr := range acw.builders {
   178  			if leafIdx == 0 {
   179  				defer bldr.Release()
   180  			}
   181  			res, err := bldr.write(leafIdx, arrCtx)
   182  			if err != nil {
   183  				return err
   184  			}
   185  			defer res.Release()
   187  			if len(res.postListVisitedElems) != 1 {
   188  				return errors.New("lists with non-zero length null components are not supported")
   189  			}
   190  			rng := res.postListVisitedElems[0]
   191  			values := array.NewSlice(res.leafArr, rng.start, rng.end)
   192  			defer values.Release()
   193  			if err = WriteArrowToColumn(ctx, cw, values, res.defLevels, res.repLevels, res.leafIsNullable); err != nil {
   194  				return err
   195  			}
   196  		}
   197  	}
   198  	return nil
   199  }
   201  // WriteArrowToColumn writes apache arrow columnar data directly to a ColumnWriter.
   202  // Returns non-nil error if the array data type is not compatible with the concrete
   203  // writer type.
   204  //
   205  // leafArr is always a primitive (possibly dictionary encoded type).
   206  // Leaf_field_nullable indicates whether the leaf array is considered nullable
   207  // according to its schema in a Table or its parent array.
   208  func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, leafFieldNullable bool) error {
   209  	// Leaf nulls are canonical when there is only a single null element after a list
   210  	// and it is at the leaf.
   211  	colLevelInfo := cw.LevelInfo()
   212  	singleNullable := (colLevelInfo.DefLevel == colLevelInfo.RepeatedAncestorDefLevel+1) && leafFieldNullable
   213  	maybeParentNulls := colLevelInfo.HasNullableValues() && !singleNullable
   215  	if maybeParentNulls && !cw.HasBitsBuffer() {
   216  		buf := memory.NewResizableBuffer(cw.Properties().Allocator())
   217  		buf.Resize(int(bitutil.BytesForBits(cw.Properties().WriteBatchSize())))
   218  		cw.SetBitsBuffer(buf)
   219  	}
   221  	arrCtx := arrowCtxFromContext(ctx)
   222  	defer func() {
   223  		if arrCtx.dataBuffer != nil {
   224  			arrCtx.dataBuffer.Release()
   225  			arrCtx.dataBuffer = nil
   226  		}
   227  	}()
   229  	if leafArr.DataType().ID() == arrow.DICTIONARY {
   230  		return writeDictionaryArrow(arrCtx, cw, leafArr, defLevels, repLevels, maybeParentNulls)
   231  	}
   232  	return writeDenseArrow(arrCtx, cw, leafArr, defLevels, repLevels, maybeParentNulls)
   233  }
   235  type binaryarr interface {
   236  	ValueOffsets() []int32
   237  }
   239  type binary64arr interface {
   240  	ValueOffsets() []int64
   241  }
   243  func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err error) {
   244  	if leafArr.DataType().ID() == arrow.EXTENSION {
   245  		extensionArray := leafArr.(array.ExtensionArray)
   246  		// Replace leafArr with its underlying storage array
   247  		leafArr = extensionArray.Storage()
   248  	}
   250  	noNulls := cw.Descr().SchemaNode().RepetitionType() == parquet.Repetitions.Required || leafArr.NullN() == 0
   252  	if ctx.dataBuffer == nil {
   253  		ctx.dataBuffer = memory.NewResizableBuffer(cw.Properties().Allocator())
   254  	}
   256  	switch wr := cw.(type) {
   257  	case *file.BooleanColumnChunkWriter:
   258  		if leafArr.DataType().ID() != arrow.BOOL {
   259  			return fmt.Errorf("type mismatch, column is %s, array is %s", cw.Type(), leafArr.DataType().ID())
   260  		}
   261  		// TODO(mtopol): optimize this so that we aren't converting from
   262  		// the bitmap -> []bool -> bitmap anymore
   263  		if leafArr.Len() == 0 {
   264  			_, err = wr.WriteBatch(nil, defLevels, repLevels)
   265  			break
   266  		}
   268  		ctx.dataBuffer.ResizeNoShrink(leafArr.Len())
   269  		buf := ctx.dataBuffer.Bytes()
   270  		data := *(*[]bool)(unsafe.Pointer(&buf))
   271  		for idx := range data {
   272  			data[idx] = leafArr.(*array.Boolean).Value(idx)
   273  		}
   274  		if !maybeParentNulls && noNulls {
   275  			wr.WriteBatch(data, defLevels, repLevels)
   276  		} else {
   277  			wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
   278  		}
   279  	case *file.Int32ColumnChunkWriter:
   280  		var data []int32
   281  		switch leafArr.DataType().ID() {
   282  		case arrow.INT32:
   283  			data = leafArr.(*array.Int32).Int32Values()
   284  		case arrow.DATE32, arrow.UINT32:
   285  			if leafArr.Data().Buffers()[1] != nil {
   286  				data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
   287  				data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
   288  			}
   289  		case arrow.TIME32:
   290  			if leafArr.DataType().(*arrow.Time32Type).Unit != arrow.Second {
   291  				if leafArr.Data().Buffers()[1] != nil {
   292  					data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
   293  					data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
   294  				}
   295  			} else { // coerce time32 if necessary by multiplying by 1000
   296  				ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
   297  				data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   298  				for idx, val := range leafArr.(*array.Time32).Time32Values() {
   299  					data[idx] = int32(val) * 1000
   300  				}
   301  			}
   302  		case arrow.NULL:
   303  			wr.WriteBatchSpaced(nil, defLevels, repLevels, leafArr.NullBitmapBytes(), 0)
   304  			return
   306  		default:
   307  			// simple integral cases, parquet physical storage is int32 or int64
   308  			// so we have to create a new array of int32's for anything smaller than
   309  			// 32-bits
   310  			ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
   311  			data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   312  			switch leafArr.DataType().ID() {
   313  			case arrow.UINT8:
   314  				for idx, val := range leafArr.(*array.Uint8).Uint8Values() {
   315  					data[idx] = int32(val)
   316  				}
   317  			case arrow.INT8:
   318  				for idx, val := range leafArr.(*array.Int8).Int8Values() {
   319  					data[idx] = int32(val)
   320  				}
   321  			case arrow.UINT16:
   322  				for idx, val := range leafArr.(*array.Uint16).Uint16Values() {
   323  					data[idx] = int32(val)
   324  				}
   325  			case arrow.INT16:
   326  				for idx, val := range leafArr.(*array.Int16).Int16Values() {
   327  					data[idx] = int32(val)
   328  				}
   329  			case arrow.DATE64:
   330  				for idx, val := range leafArr.(*array.Date64).Date64Values() {
   331  					data[idx] = int32(val / 86400000) // coerce date64 values
   332  				}
   333  			case arrow.DECIMAL128:
   334  				for idx, val := range leafArr.(*array.Decimal128).Values() {
   335  					debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
   336  					debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
   337  					data[idx] = int32(val.LowBits())
   338  				}
   339  			case arrow.DECIMAL256:
   340  				for idx, val := range leafArr.(*array.Decimal256).Values() {
   341  					debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
   342  					debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
   343  					data[idx] = int32(val.LowBits())
   344  				}
   345  			default:
   346  				return fmt.Errorf("type mismatch, column is int32 writer, arrow array is %s, and not a compatible type", leafArr.DataType().Name())
   347  			}
   348  		}
   350  		if !maybeParentNulls && noNulls {
   351  			_, err = wr.WriteBatch(data, defLevels, repLevels)
   352  		} else {
   353  			nulls := leafArr.NullBitmapBytes()
   354  			wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
   355  		}
   356  	case *file.Int64ColumnChunkWriter:
   357  		var data []int64
   358  		switch leafArr.DataType().ID() {
   359  		case arrow.TIMESTAMP:
   360  			tstype := leafArr.DataType().(*arrow.TimestampType)
   361  			if ctx.props.coerceTimestamps {
   362  				// user explicitly requested coercion to specific unit
   363  				if tstype.Unit == ctx.props.coerceTimestampUnit {
   364  					// no conversion necessary
   365  					if leafArr.Data().Buffers()[1] != nil {
   366  						data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
   367  						data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
   368  					}
   369  				} else {
   370  					ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
   371  					data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   372  					if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &ctx.props, data); err != nil {
   373  						return err
   374  					}
   375  				}
   376  			} else if (cw.Properties().Version() == parquet.V1_0 || cw.Properties().Version() == parquet.V2_4) && tstype.Unit == arrow.Nanosecond {
   377  				// absent superceding user instructions, when writing a Parquet Version <=2.4 File,
   378  				// timestamps in nanoseconds are coerced to microseconds
   379  				ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
   380  				data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   381  				p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Microsecond), WithTruncatedTimestamps(true))
   382  				if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
   383  					return err
   384  				}
   385  			} else if tstype.Unit == arrow.Second {
   386  				// absent superceding user instructions, timestamps in seconds are coerced
   387  				// to milliseconds
   388  				p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Millisecond))
   389  				ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
   390  				data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   391  				if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
   392  					return err
   393  				}
   394  			} else {
   395  				// no data conversion necessary
   396  				if leafArr.Data().Buffers()[1] != nil {
   397  					data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
   398  					data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
   399  				}
   400  			}
   401  		case arrow.UINT32:
   402  			ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
   403  			data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   404  			for idx, val := range leafArr.(*array.Uint32).Uint32Values() {
   405  				data[idx] = int64(val)
   406  			}
   407  		case arrow.INT64:
   408  			data = leafArr.(*array.Int64).Int64Values()
   409  		case arrow.UINT64, arrow.TIME64, arrow.DATE64:
   410  			if leafArr.Data().Buffers()[1] != nil {
   411  				data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
   412  				data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
   413  			}
   414  		case arrow.DECIMAL128:
   415  			ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
   416  			data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   417  			for idx, val := range leafArr.(*array.Decimal128).Values() {
   418  				debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
   419  				data[idx] = int64(val.LowBits())
   420  			}
   421  		case arrow.DECIMAL256:
   422  			ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
   423  			data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   424  			for idx, val := range leafArr.(*array.Decimal256).Values() {
   425  				debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
   426  				data[idx] = int64(val.LowBits())
   427  			}
   428  		default:
   429  			return fmt.Errorf("unimplemented arrow type to write to int64 column: %s", leafArr.DataType().Name())
   430  		}
   432  		if !maybeParentNulls && noNulls {
   433  			_, err = wr.WriteBatch(data, defLevels, repLevels)
   434  		} else {
   435  			nulls := leafArr.NullBitmapBytes()
   436  			wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
   437  		}
   438  	case *file.Int96ColumnChunkWriter:
   439  		if leafArr.DataType().ID() != arrow.TIMESTAMP {
   440  			return errors.New("unsupported arrow type to write to Int96 column")
   441  		}
   442  		ctx.dataBuffer.ResizeNoShrink(parquet.Int96Traits.BytesRequired(leafArr.Len()))
   443  		data := parquet.Int96Traits.CastFromBytes(ctx.dataBuffer.Bytes())
   444  		input := leafArr.(*array.Timestamp).TimestampValues()
   445  		unit := leafArr.DataType().(*arrow.TimestampType).Unit
   446  		for idx, val := range input {
   447  			arrowTimestampToImpalaTimestamp(unit, int64(val), &data[idx])
   448  		}
   450  		if !maybeParentNulls && noNulls {
   451  			_, err = wr.WriteBatch(data, defLevels, repLevels)
   452  		} else {
   453  			nulls := leafArr.NullBitmapBytes()
   454  			wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
   455  		}
   456  	case *file.Float32ColumnChunkWriter:
   457  		if leafArr.DataType().ID() != arrow.FLOAT32 {
   458  			return errors.New("invalid column type to write to Float")
   459  		}
   460  		if !maybeParentNulls && noNulls {
   461  			_, err = wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels)
   462  		} else {
   463  			wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
   464  		}
   465  	case *file.Float64ColumnChunkWriter:
   466  		if leafArr.DataType().ID() != arrow.FLOAT64 {
   467  			return errors.New("invalid column type to write to Float")
   468  		}
   469  		if !maybeParentNulls && noNulls {
   470  			_, err = wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels)
   471  		} else {
   472  			wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
   473  		}
   474  	case *file.ByteArrayColumnChunkWriter:
   475  		var (
   476  			buffer   = leafArr.Data().Buffers()[2]
   477  			valueBuf []byte
   478  		)
   480  		if buffer == nil {
   481  			valueBuf = []byte{}
   482  		} else {
   483  			valueBuf = buffer.Bytes()
   484  		}
   486  		data := make([]parquet.ByteArray, leafArr.Len())
   487  		switch leafArr.DataType().ID() {
   488  		case arrow.BINARY, arrow.STRING:
   489  			offsets := leafArr.(binaryarr).ValueOffsets()
   490  			for i := range data {
   491  				data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
   492  			}
   493  		case arrow.LARGE_BINARY, arrow.LARGE_STRING:
   494  			offsets := leafArr.(binary64arr).ValueOffsets()
   495  			for i := range data {
   496  				data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
   497  			}
   498  		default:
   499  			return fmt.Errorf("%w: invalid column type to write to ByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
   500  		}
   502  		if !maybeParentNulls && noNulls {
   503  			_, err = wr.WriteBatch(data, defLevels, repLevels)
   504  		} else {
   505  			wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
   506  		}
   508  	case *file.FixedLenByteArrayColumnChunkWriter:
   509  		switch dt := leafArr.DataType().(type) {
   510  		case *arrow.FixedSizeBinaryType:
   511  			data := make([]parquet.FixedLenByteArray, leafArr.Len())
   512  			for idx := range data {
   513  				data[idx] = leafArr.(*array.FixedSizeBinary).Value(idx)
   514  			}
   515  			if !maybeParentNulls && noNulls {
   516  				_, err = wr.WriteBatch(data, defLevels, repLevels)
   517  			} else {
   518  				wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
   519  			}
   520  		case *arrow.Decimal128Type:
   521  			// parquet decimal are stored with FixedLength values where the length is
   522  			// proportional to the precision. Arrow's Decimal are always stored with 16/32
   523  			// bytes. thus the internal FLBA must be adjusted by the offset calculation
   524  			offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
   525  			ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
   526  			scratch := ctx.dataBuffer.Bytes()
   527  			typeLen := wr.Descr().TypeLength()
   528  			fixDecimalEndianness := func(in decimal128.Num) parquet.FixedLenByteArray {
   529  				out := scratch[offset : offset+typeLen]
   530  				binary.BigEndian.PutUint64(scratch, uint64(in.HighBits()))
   531  				binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], in.LowBits())
   532  				scratch = scratch[2*arrow.Uint64SizeBytes:]
   533  				return out
   534  			}
   536  			data := make([]parquet.FixedLenByteArray, leafArr.Len())
   537  			arr := leafArr.(*array.Decimal128)
   538  			if leafArr.NullN() == 0 {
   539  				for idx := range data {
   540  					data[idx] = fixDecimalEndianness(arr.Value(idx))
   541  				}
   542  				_, err = wr.WriteBatch(data, defLevels, repLevels)
   543  			} else {
   544  				for idx := range data {
   545  					if arr.IsValid(idx) {
   546  						data[idx] = fixDecimalEndianness(arr.Value(idx))
   547  					}
   548  				}
   549  				wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
   550  			}
   551  		case *arrow.Decimal256Type:
   552  			// parquet decimal are stored with FixedLength values where the length is
   553  			// proportional to the precision. Arrow's Decimal are always stored with 16/32
   554  			// bytes. thus the internal FLBA must be adjusted by the offset calculation
   555  			offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
   556  			ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
   557  			scratch := ctx.dataBuffer.Bytes()
   558  			typeLen := wr.Descr().TypeLength()
   559  			fixDecimalEndianness := func(in decimal256.Num) parquet.FixedLenByteArray {
   560  				out := scratch[offset : offset+typeLen]
   561  				vals := in.Array()
   562  				binary.BigEndian.PutUint64(scratch, vals[3])
   563  				binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], vals[2])
   564  				binary.BigEndian.PutUint64(scratch[2*arrow.Uint64SizeBytes:], vals[1])
   565  				binary.BigEndian.PutUint64(scratch[3*arrow.Uint64SizeBytes:], vals[0])
   566  				scratch = scratch[4*arrow.Uint64SizeBytes:]
   567  				return out
   568  			}
   570  			data := make([]parquet.FixedLenByteArray, leafArr.Len())
   571  			arr := leafArr.(*array.Decimal256)
   572  			if leafArr.NullN() == 0 {
   573  				for idx := range data {
   574  					data[idx] = fixDecimalEndianness(arr.Value(idx))
   575  				}
   576  				_, err = wr.WriteBatch(data, defLevels, repLevels)
   577  			} else {
   578  				for idx := range data {
   579  					if arr.IsValid(idx) {
   580  						data[idx] = fixDecimalEndianness(arr.Value(idx))
   581  					}
   582  				}
   583  				wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
   584  			}
   585  		case *arrow.Float16Type:
   586  			typeLen := wr.Descr().TypeLength()
   587  			if typeLen != arrow.Float16SizeBytes {
   588  				return fmt.Errorf("%w: invalid FixedLenByteArray length to write from float16 column: %d", arrow.ErrInvalid, typeLen)
   589  			}
   591  			arr := leafArr.(*array.Float16)
   592  			rawValues := arrow.Float16Traits.CastToBytes(arr.Values())
   593  			data := make([]parquet.FixedLenByteArray, arr.Len())
   595  			if arr.NullN() == 0 {
   596  				for idx := range data {
   597  					offset := idx * typeLen
   598  					data[idx] = rawValues[offset : offset+typeLen]
   599  				}
   600  				_, err = wr.WriteBatch(data, defLevels, repLevels)
   601  			} else {
   602  				for idx := range data {
   603  					if arr.IsValid(idx) {
   604  						offset := idx * typeLen
   605  						data[idx] = rawValues[offset : offset+typeLen]
   606  					}
   607  				}
   608  				wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
   609  			}
   610  		default:
   611  			return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
   612  		}
   613  	default:
   614  		return errors.New("unknown column writer physical type")
   615  	}
   616  	return
   617  }
   619  type coerceType int8
   621  const (
   622  	coerceInvalid coerceType = iota
   623  	coerceDivide
   624  	coerceMultiply
   625  )
   627  type coercePair struct {
   628  	typ    coerceType
   629  	factor int64
   630  }
   632  var factors = map[arrow.TimeUnit]map[arrow.TimeUnit]coercePair{
   633  	arrow.Second: {
   634  		arrow.Second:      {coerceInvalid, 0},
   635  		arrow.Millisecond: {coerceMultiply, 1000},
   636  		arrow.Microsecond: {coerceMultiply, 1000000},
   637  		arrow.Nanosecond:  {coerceMultiply, 1000000000},
   638  	},
   639  	arrow.Millisecond: {
   640  		arrow.Second:      {coerceInvalid, 0},
   641  		arrow.Millisecond: {coerceMultiply, 1},
   642  		arrow.Microsecond: {coerceMultiply, 1000},
   643  		arrow.Nanosecond:  {coerceMultiply, 1000000},
   644  	},
   645  	arrow.Microsecond: {
   646  		arrow.Second:      {coerceInvalid, 0},
   647  		arrow.Millisecond: {coerceDivide, 1000},
   648  		arrow.Microsecond: {coerceMultiply, 1},
   649  		arrow.Nanosecond:  {coerceMultiply, 1000},
   650  	},
   651  	arrow.Nanosecond: {
   652  		arrow.Second:      {coerceInvalid, 0},
   653  		arrow.Millisecond: {coerceDivide, 1000000},
   654  		arrow.Microsecond: {coerceDivide, 1000},
   655  		arrow.Nanosecond:  {coerceMultiply, 1},
   656  	},
   657  }
   659  func writeCoerceTimestamps(arr *array.Timestamp, props *ArrowWriterProperties, out []int64) error {
   660  	source := arr.DataType().(*arrow.TimestampType).Unit
   661  	target := props.coerceTimestampUnit
   662  	truncation := props.allowTruncatedTimestamps
   664  	vals := arr.TimestampValues()
   665  	multiply := func(factor int64) error {
   666  		for idx, val := range vals {
   667  			out[idx] = int64(val) * factor
   668  		}
   669  		return nil
   670  	}
   672  	divide := func(factor int64) error {
   673  		for idx, val := range vals {
   674  			if !truncation && arr.IsValid(idx) && (int64(val)%factor != 0) {
   675  				return fmt.Errorf("casting from %s to %s would lose data", source, target)
   676  			}
   677  			out[idx] = int64(val) / factor
   678  		}
   679  		return nil
   680  	}
   682  	coerce := factors[source][target]
   683  	switch coerce.typ {
   684  	case coerceMultiply:
   685  		return multiply(coerce.factor)
   686  	case coerceDivide:
   687  		return divide(coerce.factor)
   688  	default:
   689  		panic("invalid coercion")
   690  	}
   691  }
   693  const (
   694  	julianEpochOffsetDays int64 = 2440588
   695  	nanoSecondsPerDay           = 24 * 60 * 60 * 1000 * 1000 * 1000
   696  )
   698  func arrowTimestampToImpalaTimestamp(unit arrow.TimeUnit, t int64, out *parquet.Int96) {
   699  	var d time.Duration
   700  	switch unit {
   701  	case arrow.Second:
   702  		d = time.Duration(t) * time.Second
   703  	case arrow.Microsecond:
   704  		d = time.Duration(t) * time.Microsecond
   705  	case arrow.Millisecond:
   706  		d = time.Duration(t) * time.Millisecond
   707  	case arrow.Nanosecond:
   708  		d = time.Duration(t) * time.Nanosecond
   709  	}
   711  	julianDays := (int64(d.Hours()) / 24) + julianEpochOffsetDays
   712  	lastDayNanos := t % (nanoSecondsPerDay)
   713  	binary.LittleEndian.PutUint64((*out)[:8], uint64(lastDayNanos))
   714  	binary.LittleEndian.PutUint32((*out)[8:], uint32(julianDays))
   715  }

