...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "github.com/apache/arrow/go/v15/arrow/memory"
21 "github.com/apache/arrow/go/v15/internal/utils"
22 "github.com/apache/arrow/go/v15/parquet"
23 "golang.org/x/xerrors"
24 )
25
26
27
28
29
30
31
32
33
34
35 type DeltaLengthByteArrayEncoder struct {
36 encoder
37
38 lengthEncoder *DeltaBitPackInt32Encoder
39 }
40
41
42 func (enc *DeltaLengthByteArrayEncoder) Put(in []parquet.ByteArray) {
43 lengths := make([]int32, len(in))
44 totalLen := int(0)
45 for idx, val := range in {
46 lengths[idx] = int32(val.Len())
47 totalLen += val.Len()
48 }
49
50 enc.lengthEncoder.Put(lengths)
51 enc.sink.Reserve(totalLen)
52 for _, val := range in {
53 enc.sink.UnsafeWrite(val)
54 }
55 }
56
57
58
59 func (enc *DeltaLengthByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
60 if validBits != nil {
61 data := make([]parquet.ByteArray, len(in))
62 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
63 enc.Put(data[:nvalid])
64 } else {
65 enc.Put(in)
66 }
67 }
68
69
70 func (DeltaLengthByteArrayEncoder) Type() parquet.Type {
71 return parquet.Types.ByteArray
72 }
73
74
75
76 func (enc *DeltaLengthByteArrayEncoder) FlushValues() (Buffer, error) {
77 ret, err := enc.lengthEncoder.FlushValues()
78 if err != nil {
79 return nil, err
80 }
81 defer ret.Release()
82
83 data := enc.sink.Finish()
84 defer data.Release()
85
86 output := bufferPool.Get().(*memory.Buffer)
87 output.ResizeNoShrink(ret.Len() + data.Len())
88 copy(output.Bytes(), ret.Bytes())
89 copy(output.Bytes()[ret.Len():], data.Bytes())
90 return poolBuffer{output}, nil
91 }
92
93
94
95 type DeltaLengthByteArrayDecoder struct {
96 decoder
97
98 mem memory.Allocator
99 lengths []int32
100 }
101
102
103 func (DeltaLengthByteArrayDecoder) Type() parquet.Type {
104 return parquet.Types.ByteArray
105 }
106
107 func (d *DeltaLengthByteArrayDecoder) Allocator() memory.Allocator { return d.mem }
108
109
110
111 func (d *DeltaLengthByteArrayDecoder) SetData(nvalues int, data []byte) error {
112 dec := DeltaBitPackInt32Decoder{
113 deltaBitPackDecoder: &deltaBitPackDecoder{
114 decoder: newDecoderBase(d.encoding, d.descr),
115 mem: d.mem}}
116
117 if err := dec.SetData(nvalues, data); err != nil {
118 return err
119 }
120 d.lengths = make([]int32, dec.totalValues)
121 dec.Decode(d.lengths)
122
123 return d.decoder.SetData(nvalues, data[int(dec.bytesRead()):])
124 }
125
126
127
128 func (d *DeltaLengthByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
129 max := utils.Min(len(out), d.nvals)
130 for i := 0; i < max; i++ {
131 out[i] = d.data[:d.lengths[i]:d.lengths[i]]
132 d.data = d.data[d.lengths[i]:]
133 }
134 d.nvals -= max
135 d.lengths = d.lengths[max:]
136 return max, nil
137 }
138
139
140 func (d *DeltaLengthByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
141 toread := len(out) - nullCount
142 values, _ := d.Decode(out[:toread])
143 if values != toread {
144 return values, xerrors.New("parquet: number of values / definition levels read did not match")
145 }
146
147 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
148 }
149
View as plain text