...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "encoding/binary"
21
22 "github.com/apache/arrow/go/v15/arrow"
23 "github.com/apache/arrow/go/v15/arrow/array"
24 "github.com/apache/arrow/go/v15/arrow/memory"
25 "github.com/apache/arrow/go/v15/internal/utils"
26 "github.com/apache/arrow/go/v15/parquet"
27 pqutils "github.com/apache/arrow/go/v15/parquet/internal/utils"
28 "golang.org/x/xerrors"
29 )
30
31
32
33
34
35
36
37
38 type PlainByteArrayDecoder struct {
39 decoder
40 }
41
42
43 func (PlainByteArrayDecoder) Type() parquet.Type {
44 return parquet.Types.ByteArray
45 }
46
47
48
49
50
51 func (pbad *PlainByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
52 max := utils.Min(len(out), pbad.nvals)
53
54 for i := 0; i < max; i++ {
55
56
57 if len(pbad.data) < 4 {
58 return i, xerrors.New("parquet: eof reading bytearray")
59 }
60
61
62 byteLen := int32(binary.LittleEndian.Uint32(pbad.data[:4]))
63 if byteLen < 0 {
64 return i, xerrors.New("parquet: invalid BYTE_ARRAY value")
65 }
66
67 if int64(len(pbad.data)) < int64(byteLen)+4 {
68 return i, xerrors.New("parquet: eof reading bytearray")
69 }
70
71 out[i] = pbad.data[4 : byteLen+4 : byteLen+4]
72 pbad.data = pbad.data[byteLen+4:]
73 }
74
75 pbad.nvals -= max
76 return max, nil
77 }
78
79
80
81 func (pbad *PlainByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
82 toRead := len(out) - nullCount
83 valuesRead, err := pbad.Decode(out[:toRead])
84 if err != nil {
85 return valuesRead, err
86 }
87 if valuesRead != toRead {
88 return valuesRead, xerrors.New("parquet: number of values / definition levels read did not match")
89 }
90
91 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
92 }
93
94 func (d *DictByteArrayDecoder) InsertDictionary(bldr array.Builder) error {
95 conv := d.dictValueDecoder.(*ByteArrayDictConverter)
96 dictLength := cap(conv.dict)
97 conv.ensure(pqutils.IndexType(dictLength))
98
99 byteArrayData := memory.NewResizableBuffer(d.mem)
100 defer byteArrayData.Release()
101 byteArrayOffsets := memory.NewResizableBuffer(d.mem)
102 defer byteArrayOffsets.Release()
103
104 var totalLen int
105 for _, v := range conv.dict {
106 totalLen += len(v)
107 }
108 byteArrayData.ResizeNoShrink(totalLen)
109 byteArrayOffsets.ResizeNoShrink((dictLength + 1) * arrow.Int32SizeBytes)
110
111 byteData := byteArrayData.Bytes()
112 byteOffsets := arrow.Int32Traits.CastFromBytes(byteArrayOffsets.Bytes())
113
114 var offset int32
115 for i, v := range conv.dict {
116 n := copy(byteData, v)
117 byteData, byteOffsets[i] = byteData[n:], offset
118 offset += int32(n)
119 }
120 byteOffsets[dictLength] = offset
121
122 data := array.NewData(bldr.Type().(*arrow.DictionaryType).ValueType, dictLength,
123 []*memory.Buffer{nil, byteArrayOffsets, byteArrayData}, nil, 0, 0)
124 defer data.Release()
125 arr := array.NewBinaryData(data)
126 defer arr.Release()
127
128 binaryBldr := bldr.(*array.BinaryDictionaryBuilder)
129 return binaryBldr.InsertDictValues(arr)
130 }
131
View as plain text