...
1 package lz4
2
3 import (
4 "bytes"
5 "io"
6
7 "github.com/pierrec/lz4/v4/internal/lz4block"
8 "github.com/pierrec/lz4/v4/internal/lz4errors"
9 "github.com/pierrec/lz4/v4/internal/lz4stream"
10 )
11
12 var readerStates = []aState{
13 noState: newState,
14 errorState: newState,
15 newState: readState,
16 readState: closedState,
17 closedState: newState,
18 }
19
20
21 func NewReader(r io.Reader) *Reader {
22 return newReader(r, false)
23 }
24
25 func newReader(r io.Reader, legacy bool) *Reader {
26 zr := &Reader{frame: lz4stream.NewFrame()}
27 zr.state.init(readerStates)
28 _ = zr.Apply(DefaultConcurrency, defaultOnBlockDone)
29 zr.Reset(r)
30 return zr
31 }
32
33
34 type Reader struct {
35 state _State
36 src io.Reader
37 num int
38 frame *lz4stream.Frame
39 data []byte
40 reads chan []byte
41 idx int
42 handler func(int)
43 cum uint32
44 dict []byte
45 }
46
47 func (*Reader) private() {}
48
49 func (r *Reader) Apply(options ...Option) (err error) {
50 defer r.state.check(&err)
51 switch r.state.state {
52 case newState:
53 case errorState:
54 return r.state.err
55 default:
56 return lz4errors.ErrOptionClosedOrError
57 }
58 for _, o := range options {
59 if err = o(r); err != nil {
60 return
61 }
62 }
63 return
64 }
65
66
67 func (r *Reader) Size() int {
68 switch r.state.state {
69 case readState, closedState:
70 if r.frame.Descriptor.Flags.Size() {
71 return int(r.frame.Descriptor.ContentSize)
72 }
73 }
74 return 0
75 }
76
77 func (r *Reader) isNotConcurrent() bool {
78 return r.num == 1
79 }
80
81 func (r *Reader) init() error {
82 err := r.frame.ParseHeaders(r.src)
83 if err != nil {
84 return err
85 }
86 if !r.frame.Descriptor.Flags.BlockIndependence() {
87
88
89 r.num = 1
90 }
91 data, err := r.frame.InitR(r.src, r.num)
92 if err != nil {
93 return err
94 }
95 r.reads = data
96 r.idx = 0
97 size := r.frame.Descriptor.Flags.BlockSizeIndex()
98 r.data = size.Get()
99 r.cum = 0
100 return nil
101 }
102
103 func (r *Reader) Read(buf []byte) (n int, err error) {
104 defer r.state.check(&err)
105 switch r.state.state {
106 case readState:
107 case closedState, errorState:
108 return 0, r.state.err
109 case newState:
110
111 if err = r.init(); r.state.next(err) {
112 return
113 }
114 default:
115 return 0, r.state.fail()
116 }
117 for len(buf) > 0 {
118 var bn int
119 if r.idx == 0 {
120 if r.isNotConcurrent() {
121 bn, err = r.read(buf)
122 } else {
123 lz4block.Put(r.data)
124 r.data = <-r.reads
125 if len(r.data) == 0 {
126
127 err = r.frame.Blocks.ErrorR()
128 }
129 }
130 switch err {
131 case nil:
132 case io.EOF:
133 if er := r.frame.CloseR(r.src); er != nil {
134 err = er
135 }
136 lz4block.Put(r.data)
137 r.data = nil
138 return
139 default:
140 return
141 }
142 }
143 if bn == 0 {
144
145 bn = copy(buf, r.data[r.idx:])
146 r.idx += bn
147 if r.idx == len(r.data) {
148
149 r.idx = 0
150 }
151 }
152 buf = buf[bn:]
153 n += bn
154 r.handler(bn)
155 }
156 return
157 }
158
159
160
161
162
163 func (r *Reader) read(buf []byte) (int, error) {
164 block := r.frame.Blocks.Block
165 _, err := block.Read(r.frame, r.src, r.cum)
166 if err != nil {
167 return 0, err
168 }
169 var direct bool
170 dst := r.data[:cap(r.data)]
171 if len(buf) >= len(dst) {
172
173 direct = true
174 dst = buf
175 }
176 dst, err = block.Uncompress(r.frame, dst, r.dict, true)
177 if err != nil {
178 return 0, err
179 }
180 if !r.frame.Descriptor.Flags.BlockIndependence() {
181 if len(r.dict)+len(dst) > 128*1024 {
182 preserveSize := 64*1024 - len(dst)
183 if preserveSize < 0 {
184 preserveSize = 0
185 }
186 r.dict = r.dict[len(r.dict)-preserveSize:]
187 }
188 r.dict = append(r.dict, dst...)
189 }
190 r.cum += uint32(len(dst))
191 if direct {
192 return len(dst), nil
193 }
194 r.data = dst
195 return 0, nil
196 }
197
198
199
200
201 func (r *Reader) Reset(reader io.Reader) {
202 if r.data != nil {
203 lz4block.Put(r.data)
204 r.data = nil
205 }
206 r.frame.Reset(r.num)
207 r.state.reset()
208 r.src = reader
209 r.reads = nil
210 }
211
212
213 func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
214 switch r.state.state {
215 case closedState, errorState:
216 return 0, r.state.err
217 case newState:
218 if err = r.init(); r.state.next(err) {
219 return
220 }
221 default:
222 return 0, r.state.fail()
223 }
224 defer r.state.nextd(&err)
225
226 var data []byte
227 if r.isNotConcurrent() {
228 size := r.frame.Descriptor.Flags.BlockSizeIndex()
229 data = size.Get()
230 defer lz4block.Put(data)
231 }
232 for {
233 var bn int
234 var dst []byte
235 if r.isNotConcurrent() {
236 bn, err = r.read(data)
237 dst = data[:bn]
238 } else {
239 lz4block.Put(dst)
240 dst = <-r.reads
241 bn = len(dst)
242 if bn == 0 {
243
244 err = r.frame.Blocks.ErrorR()
245 }
246 }
247 switch err {
248 case nil:
249 case io.EOF:
250 err = r.frame.CloseR(r.src)
251 return
252 default:
253 return
254 }
255 r.handler(bn)
256 bn, err = w.Write(dst)
257 n += int64(bn)
258 if err != nil {
259 return
260 }
261 }
262 }
263
264
265 func ValidFrameHeader(in []byte) (bool, error) {
266 f := lz4stream.NewFrame()
267 err := f.ParseHeaders(bytes.NewReader(in))
268 if err == nil {
269 return true, nil
270 }
271 if err == lz4errors.ErrInvalidFrame {
272 return false, nil
273 }
274 return false, err
275 }
276
View as plain text