1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pqarrow
18
19 import (
20 "context"
21 "encoding/binary"
22 "errors"
23 "fmt"
24 "math"
25 "time"
26 "unsafe"
27
28 "github.com/apache/arrow/go/v15/arrow"
29 "github.com/apache/arrow/go/v15/arrow/array"
30 "github.com/apache/arrow/go/v15/arrow/bitutil"
31 "github.com/apache/arrow/go/v15/arrow/decimal128"
32 "github.com/apache/arrow/go/v15/arrow/decimal256"
33 "github.com/apache/arrow/go/v15/arrow/memory"
34 "github.com/apache/arrow/go/v15/internal/utils"
35 "github.com/apache/arrow/go/v15/parquet"
36 "github.com/apache/arrow/go/v15/parquet/file"
37 "github.com/apache/arrow/go/v15/parquet/internal/debug"
38 )
39
40
41 func calcLeafCount(dt arrow.DataType) int {
42 switch dt := dt.(type) {
43 case arrow.ExtensionType:
44 return calcLeafCount(dt.StorageType())
45 case arrow.NestedType:
46 nleaves := 0
47 for _, f := range dt.Fields() {
48 nleaves += calcLeafCount(f.Type)
49 }
50 return nleaves
51 case *arrow.DictionaryType:
52 return calcLeafCount(dt.ValueType)
53 default:
54 return 1
55 }
56 }
57
58 func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool {
59 curField := field
60 nullable := field.Field.Nullable
61 for curField != nil {
62 nullable = curField.Field.Nullable
63 curField = manifest.GetParent(curField)
64 }
65 return nullable
66 }
67
68
69
70
71
72 type arrowColumnWriter struct {
73 builders []*multipathLevelBuilder
74 leafCount int
75 colIdx int
76 rgw file.RowGroupWriter
77 }
78
79
80
81
82
83
84 func newArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (arrowColumnWriter, error) {
85 if data.Len() == 0 {
86 return arrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
87 }
88
89 var (
90 absPos int64
91 chunkOffset int64
92 chunkIdx int
93 values int64
94 )
95
96 for idx, chnk := range data.Chunks() {
97 chunkIdx = idx
98 if absPos >= offset {
99 break
100 }
101
102 chunkLen := int64(chnk.Len())
103 if absPos+chunkLen > offset {
104 chunkOffset = offset - absPos
105 break
106 }
107
108 absPos += chunkLen
109 }
110
111 if absPos >= int64(data.Len()) {
112 return arrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
113 }
114
115 leafCount := calcLeafCount(data.DataType())
116 isNullable := false
117
118
119
120
121 schemaField, err := manifest.GetColumnField(leafColIdx)
122 if err != nil {
123 return arrowColumnWriter{}, err
124 }
125 isNullable = nullableRoot(manifest, schemaField)
126
127 builders := make([]*multipathLevelBuilder, 0)
128 for values < size {
129 chunk := data.Chunk(chunkIdx)
130 available := int64(chunk.Len() - int(chunkOffset))
131 chunkWriteSize := utils.Min(size-values, available)
132
133
134
135 arrToWrite := array.NewSlice(chunk, chunkOffset, chunkOffset+chunkWriteSize)
136 defer arrToWrite.Release()
137
138 if arrToWrite.Len() > 0 {
139 bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable)
140 if err != nil {
141 return arrowColumnWriter{}, nil
142 }
143 if leafCount != bldr.leafCount() {
144 return arrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leaf_count: %d - %d", leafCount, bldr.leafCount())
145 }
146 builders = append(builders, bldr)
147 }
148
149 if chunkWriteSize == available {
150 chunkOffset = 0
151 chunkIdx++
152 }
153 values += chunkWriteSize
154 }
155
156 return arrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
157 }
158
159 func (acw *arrowColumnWriter) Write(ctx context.Context) error {
160 arrCtx := arrowCtxFromContext(ctx)
161 for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ {
162 var (
163 cw file.ColumnChunkWriter
164 err error
165 )
166
167 if acw.rgw.Buffered() {
168 cw, err = acw.rgw.(file.BufferedRowGroupWriter).Column(acw.colIdx + leafIdx)
169 } else {
170 cw, err = acw.rgw.(file.SerialRowGroupWriter).NextColumn()
171 }
172
173 if err != nil {
174 return err
175 }
176
177 for _, bldr := range acw.builders {
178 if leafIdx == 0 {
179 defer bldr.Release()
180 }
181 res, err := bldr.write(leafIdx, arrCtx)
182 if err != nil {
183 return err
184 }
185 defer res.Release()
186
187 if len(res.postListVisitedElems) != 1 {
188 return errors.New("lists with non-zero length null components are not supported")
189 }
190 rng := res.postListVisitedElems[0]
191 values := array.NewSlice(res.leafArr, rng.start, rng.end)
192 defer values.Release()
193 if err = WriteArrowToColumn(ctx, cw, values, res.defLevels, res.repLevels, res.leafIsNullable); err != nil {
194 return err
195 }
196 }
197 }
198 return nil
199 }
200
201
202
203
204
205
206
207
208 func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, leafFieldNullable bool) error {
209
210
211 colLevelInfo := cw.LevelInfo()
212 singleNullable := (colLevelInfo.DefLevel == colLevelInfo.RepeatedAncestorDefLevel+1) && leafFieldNullable
213 maybeParentNulls := colLevelInfo.HasNullableValues() && !singleNullable
214
215 if maybeParentNulls && !cw.HasBitsBuffer() {
216 buf := memory.NewResizableBuffer(cw.Properties().Allocator())
217 buf.Resize(int(bitutil.BytesForBits(cw.Properties().WriteBatchSize())))
218 cw.SetBitsBuffer(buf)
219 }
220
221 arrCtx := arrowCtxFromContext(ctx)
222 defer func() {
223 if arrCtx.dataBuffer != nil {
224 arrCtx.dataBuffer.Release()
225 arrCtx.dataBuffer = nil
226 }
227 }()
228
229 if leafArr.DataType().ID() == arrow.DICTIONARY {
230 return writeDictionaryArrow(arrCtx, cw, leafArr, defLevels, repLevels, maybeParentNulls)
231 }
232 return writeDenseArrow(arrCtx, cw, leafArr, defLevels, repLevels, maybeParentNulls)
233 }
234
235 type binaryarr interface {
236 ValueOffsets() []int32
237 }
238
239 type binary64arr interface {
240 ValueOffsets() []int64
241 }
242
243 func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err error) {
244 if leafArr.DataType().ID() == arrow.EXTENSION {
245 extensionArray := leafArr.(array.ExtensionArray)
246
247 leafArr = extensionArray.Storage()
248 }
249
250 noNulls := cw.Descr().SchemaNode().RepetitionType() == parquet.Repetitions.Required || leafArr.NullN() == 0
251
252 if ctx.dataBuffer == nil {
253 ctx.dataBuffer = memory.NewResizableBuffer(cw.Properties().Allocator())
254 }
255
256 switch wr := cw.(type) {
257 case *file.BooleanColumnChunkWriter:
258 if leafArr.DataType().ID() != arrow.BOOL {
259 return fmt.Errorf("type mismatch, column is %s, array is %s", cw.Type(), leafArr.DataType().ID())
260 }
261
262
263 if leafArr.Len() == 0 {
264 _, err = wr.WriteBatch(nil, defLevels, repLevels)
265 break
266 }
267
268 ctx.dataBuffer.ResizeNoShrink(leafArr.Len())
269 buf := ctx.dataBuffer.Bytes()
270 data := *(*[]bool)(unsafe.Pointer(&buf))
271 for idx := range data {
272 data[idx] = leafArr.(*array.Boolean).Value(idx)
273 }
274 if !maybeParentNulls && noNulls {
275 wr.WriteBatch(data, defLevels, repLevels)
276 } else {
277 wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
278 }
279 case *file.Int32ColumnChunkWriter:
280 var data []int32
281 switch leafArr.DataType().ID() {
282 case arrow.INT32:
283 data = leafArr.(*array.Int32).Int32Values()
284 case arrow.DATE32, arrow.UINT32:
285 if leafArr.Data().Buffers()[1] != nil {
286 data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
287 data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
288 }
289 case arrow.TIME32:
290 if leafArr.DataType().(*arrow.Time32Type).Unit != arrow.Second {
291 if leafArr.Data().Buffers()[1] != nil {
292 data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
293 data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
294 }
295 } else {
296 ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
297 data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
298 for idx, val := range leafArr.(*array.Time32).Time32Values() {
299 data[idx] = int32(val) * 1000
300 }
301 }
302 case arrow.NULL:
303 wr.WriteBatchSpaced(nil, defLevels, repLevels, leafArr.NullBitmapBytes(), 0)
304 return
305
306 default:
307
308
309
310 ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
311 data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
312 switch leafArr.DataType().ID() {
313 case arrow.UINT8:
314 for idx, val := range leafArr.(*array.Uint8).Uint8Values() {
315 data[idx] = int32(val)
316 }
317 case arrow.INT8:
318 for idx, val := range leafArr.(*array.Int8).Int8Values() {
319 data[idx] = int32(val)
320 }
321 case arrow.UINT16:
322 for idx, val := range leafArr.(*array.Uint16).Uint16Values() {
323 data[idx] = int32(val)
324 }
325 case arrow.INT16:
326 for idx, val := range leafArr.(*array.Int16).Int16Values() {
327 data[idx] = int32(val)
328 }
329 case arrow.DATE64:
330 for idx, val := range leafArr.(*array.Date64).Date64Values() {
331 data[idx] = int32(val / 86400000)
332 }
333 case arrow.DECIMAL128:
334 for idx, val := range leafArr.(*array.Decimal128).Values() {
335 debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
336 debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
337 data[idx] = int32(val.LowBits())
338 }
339 case arrow.DECIMAL256:
340 for idx, val := range leafArr.(*array.Decimal256).Values() {
341 debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
342 debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
343 data[idx] = int32(val.LowBits())
344 }
345 default:
346 return fmt.Errorf("type mismatch, column is int32 writer, arrow array is %s, and not a compatible type", leafArr.DataType().Name())
347 }
348 }
349
350 if !maybeParentNulls && noNulls {
351 _, err = wr.WriteBatch(data, defLevels, repLevels)
352 } else {
353 nulls := leafArr.NullBitmapBytes()
354 wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
355 }
356 case *file.Int64ColumnChunkWriter:
357 var data []int64
358 switch leafArr.DataType().ID() {
359 case arrow.TIMESTAMP:
360 tstype := leafArr.DataType().(*arrow.TimestampType)
361 if ctx.props.coerceTimestamps {
362
363 if tstype.Unit == ctx.props.coerceTimestampUnit {
364
365 if leafArr.Data().Buffers()[1] != nil {
366 data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
367 data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
368 }
369 } else {
370 ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
371 data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
372 if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &ctx.props, data); err != nil {
373 return err
374 }
375 }
376 } else if (cw.Properties().Version() == parquet.V1_0 || cw.Properties().Version() == parquet.V2_4) && tstype.Unit == arrow.Nanosecond {
377
378
379 ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
380 data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
381 p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Microsecond), WithTruncatedTimestamps(true))
382 if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
383 return err
384 }
385 } else if tstype.Unit == arrow.Second {
386
387
388 p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Millisecond))
389 ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
390 data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
391 if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
392 return err
393 }
394 } else {
395
396 if leafArr.Data().Buffers()[1] != nil {
397 data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
398 data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
399 }
400 }
401 case arrow.UINT32:
402 ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
403 data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
404 for idx, val := range leafArr.(*array.Uint32).Uint32Values() {
405 data[idx] = int64(val)
406 }
407 case arrow.INT64:
408 data = leafArr.(*array.Int64).Int64Values()
409 case arrow.UINT64, arrow.TIME64, arrow.DATE64:
410 if leafArr.Data().Buffers()[1] != nil {
411 data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
412 data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
413 }
414 case arrow.DECIMAL128:
415 ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
416 data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
417 for idx, val := range leafArr.(*array.Decimal128).Values() {
418 debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
419 data[idx] = int64(val.LowBits())
420 }
421 case arrow.DECIMAL256:
422 ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
423 data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
424 for idx, val := range leafArr.(*array.Decimal256).Values() {
425 debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
426 data[idx] = int64(val.LowBits())
427 }
428 default:
429 return fmt.Errorf("unimplemented arrow type to write to int64 column: %s", leafArr.DataType().Name())
430 }
431
432 if !maybeParentNulls && noNulls {
433 _, err = wr.WriteBatch(data, defLevels, repLevels)
434 } else {
435 nulls := leafArr.NullBitmapBytes()
436 wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
437 }
438 case *file.Int96ColumnChunkWriter:
439 if leafArr.DataType().ID() != arrow.TIMESTAMP {
440 return errors.New("unsupported arrow type to write to Int96 column")
441 }
442 ctx.dataBuffer.ResizeNoShrink(parquet.Int96Traits.BytesRequired(leafArr.Len()))
443 data := parquet.Int96Traits.CastFromBytes(ctx.dataBuffer.Bytes())
444 input := leafArr.(*array.Timestamp).TimestampValues()
445 unit := leafArr.DataType().(*arrow.TimestampType).Unit
446 for idx, val := range input {
447 arrowTimestampToImpalaTimestamp(unit, int64(val), &data[idx])
448 }
449
450 if !maybeParentNulls && noNulls {
451 _, err = wr.WriteBatch(data, defLevels, repLevels)
452 } else {
453 nulls := leafArr.NullBitmapBytes()
454 wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
455 }
456 case *file.Float32ColumnChunkWriter:
457 if leafArr.DataType().ID() != arrow.FLOAT32 {
458 return errors.New("invalid column type to write to Float")
459 }
460 if !maybeParentNulls && noNulls {
461 _, err = wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels)
462 } else {
463 wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
464 }
465 case *file.Float64ColumnChunkWriter:
466 if leafArr.DataType().ID() != arrow.FLOAT64 {
467 return errors.New("invalid column type to write to Float")
468 }
469 if !maybeParentNulls && noNulls {
470 _, err = wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels)
471 } else {
472 wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
473 }
474 case *file.ByteArrayColumnChunkWriter:
475 var (
476 buffer = leafArr.Data().Buffers()[2]
477 valueBuf []byte
478 )
479
480 if buffer == nil {
481 valueBuf = []byte{}
482 } else {
483 valueBuf = buffer.Bytes()
484 }
485
486 data := make([]parquet.ByteArray, leafArr.Len())
487 switch leafArr.DataType().ID() {
488 case arrow.BINARY, arrow.STRING:
489 offsets := leafArr.(binaryarr).ValueOffsets()
490 for i := range data {
491 data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
492 }
493 case arrow.LARGE_BINARY, arrow.LARGE_STRING:
494 offsets := leafArr.(binary64arr).ValueOffsets()
495 for i := range data {
496 data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
497 }
498 default:
499 return fmt.Errorf("%w: invalid column type to write to ByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
500 }
501
502 if !maybeParentNulls && noNulls {
503 _, err = wr.WriteBatch(data, defLevels, repLevels)
504 } else {
505 wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
506 }
507
508 case *file.FixedLenByteArrayColumnChunkWriter:
509 switch dt := leafArr.DataType().(type) {
510 case *arrow.FixedSizeBinaryType:
511 data := make([]parquet.FixedLenByteArray, leafArr.Len())
512 for idx := range data {
513 data[idx] = leafArr.(*array.FixedSizeBinary).Value(idx)
514 }
515 if !maybeParentNulls && noNulls {
516 _, err = wr.WriteBatch(data, defLevels, repLevels)
517 } else {
518 wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
519 }
520 case *arrow.Decimal128Type:
521
522
523
524 offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
525 ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
526 scratch := ctx.dataBuffer.Bytes()
527 typeLen := wr.Descr().TypeLength()
528 fixDecimalEndianness := func(in decimal128.Num) parquet.FixedLenByteArray {
529 out := scratch[offset : offset+typeLen]
530 binary.BigEndian.PutUint64(scratch, uint64(in.HighBits()))
531 binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], in.LowBits())
532 scratch = scratch[2*arrow.Uint64SizeBytes:]
533 return out
534 }
535
536 data := make([]parquet.FixedLenByteArray, leafArr.Len())
537 arr := leafArr.(*array.Decimal128)
538 if leafArr.NullN() == 0 {
539 for idx := range data {
540 data[idx] = fixDecimalEndianness(arr.Value(idx))
541 }
542 _, err = wr.WriteBatch(data, defLevels, repLevels)
543 } else {
544 for idx := range data {
545 if arr.IsValid(idx) {
546 data[idx] = fixDecimalEndianness(arr.Value(idx))
547 }
548 }
549 wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
550 }
551 case *arrow.Decimal256Type:
552
553
554
555 offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
556 ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
557 scratch := ctx.dataBuffer.Bytes()
558 typeLen := wr.Descr().TypeLength()
559 fixDecimalEndianness := func(in decimal256.Num) parquet.FixedLenByteArray {
560 out := scratch[offset : offset+typeLen]
561 vals := in.Array()
562 binary.BigEndian.PutUint64(scratch, vals[3])
563 binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], vals[2])
564 binary.BigEndian.PutUint64(scratch[2*arrow.Uint64SizeBytes:], vals[1])
565 binary.BigEndian.PutUint64(scratch[3*arrow.Uint64SizeBytes:], vals[0])
566 scratch = scratch[4*arrow.Uint64SizeBytes:]
567 return out
568 }
569
570 data := make([]parquet.FixedLenByteArray, leafArr.Len())
571 arr := leafArr.(*array.Decimal256)
572 if leafArr.NullN() == 0 {
573 for idx := range data {
574 data[idx] = fixDecimalEndianness(arr.Value(idx))
575 }
576 _, err = wr.WriteBatch(data, defLevels, repLevels)
577 } else {
578 for idx := range data {
579 if arr.IsValid(idx) {
580 data[idx] = fixDecimalEndianness(arr.Value(idx))
581 }
582 }
583 wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
584 }
585 case *arrow.Float16Type:
586 typeLen := wr.Descr().TypeLength()
587 if typeLen != arrow.Float16SizeBytes {
588 return fmt.Errorf("%w: invalid FixedLenByteArray length to write from float16 column: %d", arrow.ErrInvalid, typeLen)
589 }
590
591 arr := leafArr.(*array.Float16)
592 rawValues := arrow.Float16Traits.CastToBytes(arr.Values())
593 data := make([]parquet.FixedLenByteArray, arr.Len())
594
595 if arr.NullN() == 0 {
596 for idx := range data {
597 offset := idx * typeLen
598 data[idx] = rawValues[offset : offset+typeLen]
599 }
600 _, err = wr.WriteBatch(data, defLevels, repLevels)
601 } else {
602 for idx := range data {
603 if arr.IsValid(idx) {
604 offset := idx * typeLen
605 data[idx] = rawValues[offset : offset+typeLen]
606 }
607 }
608 wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
609 }
610 default:
611 return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
612 }
613 default:
614 return errors.New("unknown column writer physical type")
615 }
616 return
617 }
618
619 type coerceType int8
620
621 const (
622 coerceInvalid coerceType = iota
623 coerceDivide
624 coerceMultiply
625 )
626
627 type coercePair struct {
628 typ coerceType
629 factor int64
630 }
631
632 var factors = map[arrow.TimeUnit]map[arrow.TimeUnit]coercePair{
633 arrow.Second: {
634 arrow.Second: {coerceInvalid, 0},
635 arrow.Millisecond: {coerceMultiply, 1000},
636 arrow.Microsecond: {coerceMultiply, 1000000},
637 arrow.Nanosecond: {coerceMultiply, 1000000000},
638 },
639 arrow.Millisecond: {
640 arrow.Second: {coerceInvalid, 0},
641 arrow.Millisecond: {coerceMultiply, 1},
642 arrow.Microsecond: {coerceMultiply, 1000},
643 arrow.Nanosecond: {coerceMultiply, 1000000},
644 },
645 arrow.Microsecond: {
646 arrow.Second: {coerceInvalid, 0},
647 arrow.Millisecond: {coerceDivide, 1000},
648 arrow.Microsecond: {coerceMultiply, 1},
649 arrow.Nanosecond: {coerceMultiply, 1000},
650 },
651 arrow.Nanosecond: {
652 arrow.Second: {coerceInvalid, 0},
653 arrow.Millisecond: {coerceDivide, 1000000},
654 arrow.Microsecond: {coerceDivide, 1000},
655 arrow.Nanosecond: {coerceMultiply, 1},
656 },
657 }
658
659 func writeCoerceTimestamps(arr *array.Timestamp, props *ArrowWriterProperties, out []int64) error {
660 source := arr.DataType().(*arrow.TimestampType).Unit
661 target := props.coerceTimestampUnit
662 truncation := props.allowTruncatedTimestamps
663
664 vals := arr.TimestampValues()
665 multiply := func(factor int64) error {
666 for idx, val := range vals {
667 out[idx] = int64(val) * factor
668 }
669 return nil
670 }
671
672 divide := func(factor int64) error {
673 for idx, val := range vals {
674 if !truncation && arr.IsValid(idx) && (int64(val)%factor != 0) {
675 return fmt.Errorf("casting from %s to %s would lose data", source, target)
676 }
677 out[idx] = int64(val) / factor
678 }
679 return nil
680 }
681
682 coerce := factors[source][target]
683 switch coerce.typ {
684 case coerceMultiply:
685 return multiply(coerce.factor)
686 case coerceDivide:
687 return divide(coerce.factor)
688 default:
689 panic("invalid coercion")
690 }
691 }
692
693 const (
694 julianEpochOffsetDays int64 = 2440588
695 nanoSecondsPerDay = 24 * 60 * 60 * 1000 * 1000 * 1000
696 )
697
698 func arrowTimestampToImpalaTimestamp(unit arrow.TimeUnit, t int64, out *parquet.Int96) {
699 var d time.Duration
700 switch unit {
701 case arrow.Second:
702 d = time.Duration(t) * time.Second
703 case arrow.Microsecond:
704 d = time.Duration(t) * time.Microsecond
705 case arrow.Millisecond:
706 d = time.Duration(t) * time.Millisecond
707 case arrow.Nanosecond:
708 d = time.Duration(t) * time.Nanosecond
709 }
710
711 julianDays := (int64(d.Hours()) / 24) + julianEpochOffsetDays
712 lastDayNanos := t % (nanoSecondsPerDay)
713 binary.LittleEndian.PutUint64((*out)[:8], uint64(lastDayNanos))
714 binary.LittleEndian.PutUint32((*out)[8:], uint32(julianDays))
715 }
716
View as plain text