...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "fmt"
21
22 "github.com/apache/arrow/go/v15/arrow"
23 "github.com/apache/arrow/go/v15/internal/bitutils"
24 "github.com/apache/arrow/go/v15/parquet"
25 )
26
27
28
29 type PlainFixedLenByteArrayEncoder struct {
30 encoder
31
32 bitSetReader bitutils.SetBitRunReader
33 }
34
35
36 func (enc *PlainFixedLenByteArrayEncoder) Put(in []parquet.FixedLenByteArray) {
37 typeLen := enc.descr.TypeLength()
38 if typeLen == 0 {
39 return
40 }
41
42 bytesNeeded := len(in) * typeLen
43 enc.sink.Reserve(bytesNeeded)
44 for _, val := range in {
45 if val == nil {
46 panic("value cannot be nil")
47 }
48 enc.sink.UnsafeWrite(val[:typeLen])
49 }
50 }
51
52
53 func (enc *PlainFixedLenByteArrayEncoder) PutSpaced(in []parquet.FixedLenByteArray, validBits []byte, validBitsOffset int64) {
54 if validBits != nil {
55 if enc.bitSetReader == nil {
56 enc.bitSetReader = bitutils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in)))
57 } else {
58 enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in)))
59 }
60
61 for {
62 run := enc.bitSetReader.NextRun()
63 if run.Length == 0 {
64 break
65 }
66 enc.Put(in[int(run.Pos):int(run.Pos+run.Length)])
67 }
68 } else {
69 enc.Put(in)
70 }
71 }
72
73
74 func (PlainFixedLenByteArrayEncoder) Type() parquet.Type {
75 return parquet.Types.FixedLenByteArray
76 }
77
78
79
80 func (enc *DictFixedLenByteArrayEncoder) WriteDict(out []byte) {
81 enc.memo.(BinaryMemoTable).CopyFixedWidthValues(0, enc.typeLen, out)
82 }
83
84
85 func (enc *DictFixedLenByteArrayEncoder) Put(in []parquet.FixedLenByteArray) {
86 for _, v := range in {
87 memoIdx, found, err := enc.memo.GetOrInsert(v)
88 if err != nil {
89 panic(err)
90 }
91 if !found {
92 enc.dictEncodedSize += enc.typeLen
93 }
94 enc.addIndex(memoIdx)
95 }
96 }
97
98
99 func (enc *DictFixedLenByteArrayEncoder) PutSpaced(in []parquet.FixedLenByteArray, validBits []byte, validBitsOffset int64) {
100 bitutils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)), func(pos, length int64) error {
101 enc.Put(in[pos : pos+length])
102 return nil
103 })
104 }
105
106
107
108
109
110
111 func (enc *DictFixedLenByteArrayEncoder) PutDictionary(values arrow.Array) error {
112 if values.DataType().ID() != arrow.FIXED_SIZE_BINARY && values.DataType().ID() != arrow.DECIMAL {
113 return fmt.Errorf("%w: only fixed size binary and decimal128 arrays are supported", arrow.ErrInvalid)
114 }
115
116 if values.DataType().(arrow.FixedWidthDataType).Bytes() != enc.typeLen {
117 return fmt.Errorf("%w: size mismatch: %s should have been %d wide",
118 arrow.ErrInvalid, values.DataType(), enc.typeLen)
119 }
120
121 if err := enc.canPutDictionary(values); err != nil {
122 return err
123 }
124
125 enc.dictEncodedSize += enc.typeLen * values.Len()
126 data := values.Data().Buffers()[1].Bytes()[values.Data().Offset()*enc.typeLen:]
127 for i := 0; i < values.Len(); i++ {
128 _, _, err := enc.memo.GetOrInsert(data[i*enc.typeLen : (i+1)*enc.typeLen])
129 if err != nil {
130 return err
131 }
132 }
133
134 values.Retain()
135 enc.preservedDict = values
136 return nil
137 }
138
View as plain text