...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package encoding
18
19 import (
20 "bytes"
21 "encoding/binary"
22 "errors"
23 "fmt"
24 "io"
25
26 "github.com/apache/arrow/go/v15/arrow/bitutil"
27 shared_utils "github.com/apache/arrow/go/v15/internal/utils"
28 "github.com/apache/arrow/go/v15/parquet"
29 "github.com/apache/arrow/go/v15/parquet/internal/utils"
30 )
31
32
33
34 type PlainBooleanDecoder struct {
35 decoder
36
37 bitOffset int
38 }
39
40
41 func (PlainBooleanDecoder) Type() parquet.Type {
42 return parquet.Types.Boolean
43 }
44
45 func (dec *PlainBooleanDecoder) SetData(nvals int, data []byte) error {
46 if err := dec.decoder.SetData(nvals, data); err != nil {
47 return err
48 }
49 dec.bitOffset = 0
50 return nil
51 }
52
53
54
55
56
57 func (dec *PlainBooleanDecoder) Decode(out []bool) (int, error) {
58 max := shared_utils.Min(len(out), dec.nvals)
59
60
61 unalignedExtract := func(i int) int {
62 for ; dec.bitOffset < 8 && i < max; i, dec.bitOffset = i+1, dec.bitOffset+1 {
63 out[i] = (dec.data[0] & byte(1<<dec.bitOffset)) != 0
64 }
65 if dec.bitOffset == 8 {
66
67 dec.bitOffset = 0
68 dec.data = dec.data[1:]
69 }
70 return i
71 }
72
73
74
75 i := 0
76 if dec.bitOffset != 0 {
77 i = unalignedExtract(i)
78 }
79
80
81
82 bitsRemain := max - i
83 batch := (bitsRemain / 8) * 8
84 if batch > 0 {
85
86
87 alignedBytes := bitutil.BytesForBits(int64(batch))
88 utils.BytesToBools(dec.data[:alignedBytes], out[i:])
89
90 dec.data = dec.data[alignedBytes:]
91 i += int(alignedBytes) * 8
92 }
93
94
95 _ = unalignedExtract(i)
96
97 dec.nvals -= max
98 return max, nil
99 }
100
101
102
103 func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
104 if nullCount > 0 {
105 toRead := len(out) - nullCount
106 valuesRead, err := dec.Decode(out[:toRead])
107 if err != nil {
108 return 0, err
109 }
110 if valuesRead != toRead {
111 return valuesRead, errors.New("parquet: boolean decoder: number of values / definition levels read did not match")
112 }
113 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
114 }
115 return dec.Decode(out)
116 }
117
118 type RleBooleanDecoder struct {
119 decoder
120
121 rleDec *utils.RleDecoder
122 }
123
124 func (RleBooleanDecoder) Type() parquet.Type {
125 return parquet.Types.Boolean
126 }
127
128 func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
129 dec.nvals = nvals
130
131 if len(data) < 4 {
132 return fmt.Errorf("invalid length - %d (corrupt data page?)", len(data))
133 }
134
135
136 nbytes := binary.LittleEndian.Uint32(data[:4])
137 if nbytes > uint32(len(data)-4) {
138 return fmt.Errorf("received invalid number of bytes - %d (corrupt data page?)", nbytes)
139 }
140
141 dec.data = data[4:]
142 if dec.rleDec == nil {
143 dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
144 } else {
145 dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
146 }
147 return nil
148 }
149
150 func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
151 max := shared_utils.Min(len(out), dec.nvals)
152
153 var (
154 buf [1024]uint64
155 n = max
156 )
157
158 for n > 0 {
159 batch := shared_utils.Min(len(buf), n)
160 decoded := dec.rleDec.GetBatch(buf[:batch])
161 if decoded != batch {
162 return max - n, io.ErrUnexpectedEOF
163 }
164
165 for i := 0; i < batch; i++ {
166 out[i] = buf[i] != 0
167 }
168 n -= batch
169 out = out[batch:]
170 }
171
172 dec.nvals -= max
173 return max, nil
174 }
175
176 func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
177 if nullCount > 0 {
178 toRead := len(out) - nullCount
179 valuesRead, err := dec.Decode(out[:toRead])
180 if err != nil {
181 return 0, err
182 }
183 if valuesRead != toRead {
184 return valuesRead, errors.New("parquet: rle boolean decoder: number of values / definition levels read did not match")
185 }
186 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
187 }
188 return dec.Decode(out)
189 }
190
View as plain text