...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "encoding/binary"
21 "fmt"
22 "unsafe"
23
24 "github.com/apache/arrow/go/v15/arrow"
25 "github.com/apache/arrow/go/v15/arrow/array"
26 "github.com/apache/arrow/go/v15/internal/bitutils"
27 "github.com/apache/arrow/go/v15/internal/utils"
28 "github.com/apache/arrow/go/v15/parquet"
29 )
30
31
32
33 type PlainByteArrayEncoder struct {
34 encoder
35
36 bitSetReader bitutils.SetBitRunReader
37 }
38
39
40 func (enc *PlainByteArrayEncoder) PutByteArray(val parquet.ByteArray) {
41 inc := val.Len() + arrow.Uint32SizeBytes
42 enc.sink.Reserve(inc)
43 vlen := utils.ToLEUint32(uint32(val.Len()))
44 enc.sink.UnsafeWrite((*(*[4]byte)(unsafe.Pointer(&vlen)))[:])
45 enc.sink.UnsafeWrite(val)
46 }
47
48
49 func (enc *PlainByteArrayEncoder) Put(in []parquet.ByteArray) {
50 for _, val := range in {
51 enc.PutByteArray(val)
52 }
53 }
54
55
56
57
58
59 func (enc *PlainByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
60 if validBits != nil {
61 if enc.bitSetReader == nil {
62 enc.bitSetReader = bitutils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in)))
63 } else {
64 enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in)))
65 }
66
67 for {
68 run := enc.bitSetReader.NextRun()
69 if run.Length == 0 {
70 break
71 }
72 enc.Put(in[int(run.Pos):int(run.Pos+run.Length)])
73 }
74 } else {
75 enc.Put(in)
76 }
77 }
78
79
80 func (PlainByteArrayEncoder) Type() parquet.Type {
81 return parquet.Types.ByteArray
82 }
83
84
85
86 func (enc *DictByteArrayEncoder) WriteDict(out []byte) {
87 enc.memo.(BinaryMemoTable).VisitValues(0, func(v []byte) {
88 binary.LittleEndian.PutUint32(out, uint32(len(v)))
89 out = out[arrow.Uint32SizeBytes:]
90 copy(out, v)
91 out = out[len(v):]
92 })
93 }
94
95
96
97 func (enc *DictByteArrayEncoder) PutByteArray(in parquet.ByteArray) {
98 memoIdx, found, err := enc.memo.GetOrInsert(in)
99 if err != nil {
100 panic(err)
101 }
102 if !found {
103 enc.dictEncodedSize += in.Len() + arrow.Uint32SizeBytes
104 }
105 enc.addIndex(memoIdx)
106 }
107
108
109 func (enc *DictByteArrayEncoder) Put(in []parquet.ByteArray) {
110 for _, val := range in {
111 enc.PutByteArray(val)
112 }
113 }
114
115
116 func (enc *DictByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
117 bitutils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)), func(pos, length int64) error {
118 for i := int64(0); i < length; i++ {
119 enc.PutByteArray(in[i+pos])
120 }
121 return nil
122 })
123 }
124
125
126
127
128
129
130 func (enc *DictByteArrayEncoder) PutDictionary(values arrow.Array) error {
131 if err := enc.canPutDictionary(values); err != nil {
132 return err
133 }
134
135 if !arrow.IsBaseBinary(values.DataType().ID()) {
136 return fmt.Errorf("%w: only binary and string arrays are supported", arrow.ErrInvalid)
137 }
138
139 arr := values.(array.BinaryLike)
140 data := arr.ValueBytes()
141 for i := 0; i < arr.Len(); i++ {
142 curOffset := arr.ValueOffset64(i)
143 var v []byte
144 if i == arr.Len()-1 {
145 v = data[curOffset:]
146 } else {
147 v = data[curOffset:arr.ValueOffset64(i+1)]
148 }
149 enc.dictEncodedSize += len(v) + arrow.Uint32SizeBytes
150 if _, _, err := enc.memo.GetOrInsert(v); err != nil {
151 return err
152 }
153 }
154
155 values.Retain()
156 enc.preservedDict = values
157 return nil
158 }
159
View as plain text