1
2
3
4
5
6
7
8
9
10
11
12
13 package readahead
14
15 import (
16 "errors"
17 "fmt"
18 "io"
19 )
20
21 const (
22
23 DefaultBuffers = 4
24
25
26 DefaultBufferSize = 1 << 20
27 )
28
29 type seekable struct {
30 *reader
31 }
32
33 type ReadSeekCloser interface {
34 io.ReadCloser
35 io.Seeker
36 }
37
38 type reader struct {
39 in io.Reader
40 closer io.Closer
41 ready chan *buffer
42 reuse chan *buffer
43 exit chan struct{}
44 buffers int
45 size int
46 err error
47 cur *buffer
48 exited chan struct{}
49 bufs [][]byte
50 }
51
52
53
54
55
56
57
58
59
60
61
62 func NewReader(rd io.Reader) io.ReadCloser {
63 if rd == nil {
64 return nil
65 }
66
67 ret, err := NewReaderSize(rd, DefaultBuffers, DefaultBufferSize)
68
69
70 if err != nil {
71 panic("unexpected error:" + err.Error())
72 }
73 return ret
74 }
75
76
77
78
79
80
81
82
83
84
85
86
87 func NewReadCloser(rd io.ReadCloser) io.ReadCloser {
88 if rd == nil {
89 return nil
90 }
91
92 ret, err := NewReadCloserSize(rd, DefaultBuffers, DefaultBufferSize)
93
94
95 if err != nil {
96 panic("unexpected error:" + err.Error())
97 }
98 return ret
99 }
100
101
102
103
104
105
106
107
108
109 func NewReadSeeker(rd io.ReadSeeker) ReadSeekCloser {
110
111 res, _ := NewReader(rd).(ReadSeekCloser)
112 return res
113 }
114
115
116
117
118
119
120
121
122
123
124 func NewReadSeekCloser(rd ReadSeekCloser) ReadSeekCloser {
125
126 res, _ := NewReadCloser(rd).(ReadSeekCloser)
127 return res
128 }
129
130
131
132
133 func NewReaderSize(rd io.Reader, buffers, size int) (res io.ReadCloser, err error) {
134 if size <= 0 {
135 return nil, fmt.Errorf("buffer size too small")
136 }
137 if buffers <= 0 {
138 return nil, fmt.Errorf("number of buffers too small")
139 }
140 if rd == nil {
141 return nil, fmt.Errorf("nil input reader supplied")
142 }
143 a := &reader{}
144 if _, ok := rd.(io.Seeker); ok {
145 res = &seekable{a}
146 } else {
147 res = a
148 }
149 a.init(rd, buffers, size)
150 return
151 }
152
153
154
155
156 func NewReaderBuffer(rd io.Reader, buffers [][]byte) (res io.ReadCloser, err error) {
157 if len(buffers) == 0 {
158 return nil, fmt.Errorf("number of buffers too small")
159 }
160 sz := 0
161 for _, buf := range buffers {
162 if len(buf) == 0 {
163 return nil, fmt.Errorf("zero size buffer sent")
164 }
165 if sz == 0 {
166 sz = len(buf)
167 }
168 if sz != len(buf) {
169 return nil, fmt.Errorf("buffers should have similar size")
170 }
171 }
172 if rd == nil {
173 return nil, fmt.Errorf("nil input reader supplied")
174 }
175 a := &reader{}
176 if _, ok := rd.(io.Seeker); ok {
177 res = &seekable{a}
178 } else {
179 res = a
180 }
181 a.initBuffers(rd, buffers, sz)
182
183 return
184 }
185
186
187
188
189 func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (res io.ReadCloser, err error) {
190 if size <= 0 {
191 return nil, fmt.Errorf("buffer size too small")
192 }
193 if buffers <= 0 {
194 return nil, fmt.Errorf("number of buffers too small")
195 }
196 if rc == nil {
197 return nil, fmt.Errorf("nil input reader supplied")
198 }
199 a := &reader{closer: rc}
200 if _, ok := rc.(io.Seeker); ok {
201 res = &seekable{a}
202 } else {
203 res = a
204 }
205 a.init(rc, buffers, size)
206 return
207 }
208
209
210
211
212 func NewReadCloserBuffer(rc io.ReadCloser, buffers [][]byte) (res io.ReadCloser, err error) {
213 if len(buffers) == 0 {
214 return nil, fmt.Errorf("number of buffers too small")
215 }
216 sz := 0
217 for _, buf := range buffers {
218 if len(buf) == 0 {
219 return nil, fmt.Errorf("zero size buffer sent")
220 }
221 if sz == 0 {
222 sz = len(buf)
223 }
224 if sz != len(buf) {
225 return nil, fmt.Errorf("buffers should have similar size")
226 }
227 }
228
229 if rc == nil {
230 return nil, fmt.Errorf("nil input reader supplied")
231 }
232 a := &reader{closer: rc}
233 if _, ok := rc.(io.Seeker); ok {
234 res = &seekable{a}
235 } else {
236 res = a
237 }
238 a.initBuffers(rc, buffers, sz)
239 return
240 }
241
242
243
244
245 func NewReadSeekerSize(rd io.ReadSeeker, buffers, size int) (res ReadSeekCloser, err error) {
246 reader, err := NewReaderSize(rd, buffers, size)
247 if err != nil {
248 return nil, err
249 }
250
251 res, _ = reader.(ReadSeekCloser)
252 return
253 }
254
255
256
257
258 func NewReadSeekCloserSize(rd ReadSeekCloser, buffers, size int) (res ReadSeekCloser, err error) {
259 reader, err := NewReadCloserSize(rd, buffers, size)
260 if err != nil {
261 return nil, err
262 }
263
264 res, _ = reader.(ReadSeekCloser)
265 return
266 }
267
268
269
270
271 func NewReadSeekCloserBuffer(rd ReadSeekCloser, buffers [][]byte) (res ReadSeekCloser, err error) {
272 reader, err := NewReadCloserBuffer(rd, buffers)
273 if err != nil {
274 return nil, err
275 }
276
277 res, _ = reader.(ReadSeekCloser)
278 return
279 }
280
281
282 func (a *reader) init(rd io.Reader, buffers, size int) {
283 x := make([]byte, buffers*size)
284 bufs := make([][]byte, buffers)
285 for i := range bufs {
286 bufs[i] = x[i*size : (i+1)*size : (i+1)*size]
287 }
288 a.initBuffers(rd, bufs, size)
289 }
290
291
292 func (a *reader) initBuffers(rd io.Reader, buffers [][]byte, size int) {
293 a.in = rd
294 a.ready = make(chan *buffer, len(buffers))
295 a.reuse = make(chan *buffer, len(buffers))
296 a.exit = make(chan struct{})
297 a.exited = make(chan struct{})
298 a.buffers = len(buffers)
299 a.size = size
300 a.cur = nil
301 a.err = nil
302 a.bufs = buffers
303
304
305 for _, buf := range buffers {
306 a.reuse <- newBuffer(buf)
307 }
308
309
310 go func() {
311
312 defer close(a.exited)
313 defer close(a.ready)
314 var atEOF bool
315 for {
316 select {
317 case b := <-a.reuse:
318 if atEOF {
319
320 b.err = io.EOF
321 b.buf = b.buf[:0]
322 b.offset = 0
323 a.ready <- b
324 return
325 }
326 err := b.read(a.in)
327
328 if err == io.EOF && len(b.buf) > 0 {
329 atEOF = true
330 err = nil
331 b.err = nil
332 }
333 a.ready <- b
334 if err != nil {
335 return
336 }
337 case <-a.exit:
338 return
339 }
340 }
341 }()
342 }
343
344
345
346 func (a *reader) fill() (err error) {
347 if a.cur.isEmpty() {
348 if a.cur != nil {
349 a.reuse <- a.cur
350 a.cur = nil
351 }
352 b, ok := <-a.ready
353 if !ok {
354 if a.err == nil {
355 a.err = errors.New("readahead: read after Close")
356 }
357 return a.err
358 }
359 a.cur = b
360 }
361 return nil
362 }
363
364
365 func (a *reader) Read(p []byte) (n int, err error) {
366 if a.err != nil {
367 return 0, a.err
368 }
369
370 err = a.fill()
371 if err != nil {
372 return 0, err
373 }
374
375
376 n = copy(p, a.cur.buffer())
377 a.cur.inc(n)
378
379 if a.cur.isEmpty() {
380
381 if a.cur != nil {
382
383 a.err = a.cur.err
384 a.reuse <- a.cur
385 a.cur = nil
386 }
387 return n, a.err
388 }
389 return n, nil
390 }
391
392 func (a *seekable) Seek(offset int64, whence int) (res int64, err error) {
393
394 seeker, _ := a.in.(io.Seeker)
395
396 select {
397 case <-a.exited:
398 case a.exit <- struct{}{}:
399 <-a.exited
400 }
401 if whence == io.SeekCurrent {
402
403
404 err = nil
405 for a.cur != nil {
406 if err = a.fill(); err == nil && a.cur != nil {
407 offset -= int64(len(a.cur.buffer()))
408 a.cur.offset = len(a.cur.buf)
409 }
410 }
411 }
412
413 if res, err = seeker.Seek(offset, whence); err == nil {
414
415 a.initBuffers(a.in, a.bufs, a.size)
416 }
417 return
418 }
419
420
421
422
423 func (a *reader) WriteTo(w io.Writer) (n int64, err error) {
424 if a.err != nil {
425 return 0, a.err
426 }
427 n = 0
428 for {
429 err = a.fill()
430 if err != nil {
431 return n, err
432 }
433 n2, err := w.Write(a.cur.buffer())
434 a.cur.inc(n2)
435 n += int64(n2)
436 if err != nil {
437 return n, err
438 }
439 if a.cur.err != nil {
440
441 if a.cur.err == io.EOF {
442 a.err = a.cur.err
443 return n, nil
444 }
445 a.err = a.cur.err
446 return n, a.cur.err
447 }
448 }
449 }
450
451
452
453 func (a *reader) Close() (err error) {
454 select {
455 case <-a.exited:
456 case a.exit <- struct{}{}:
457 <-a.exited
458 }
459 if a.closer != nil {
460
461 c := a.closer
462 a.closer = nil
463 return c.Close()
464 }
465 a.err = errors.New("readahead: read after Close")
466 return nil
467 }
468
469
470
471
472 type buffer struct {
473 buf []byte
474 err error
475 offset int
476 size int
477 }
478
479 func newBuffer(buf []byte) *buffer {
480 return &buffer{buf: buf, err: nil, size: len(buf)}
481 }
482
483
484
485 func (b *buffer) isEmpty() bool {
486 if b == nil {
487 return true
488 }
489 if len(b.buf)-b.offset <= 0 {
490 return true
491 }
492 return false
493 }
494
495
496
497
498 func (b *buffer) read(rd io.Reader) (err error) {
499 defer func() {
500 if r := recover(); r != nil {
501 err = fmt.Errorf("panic reading: %v", r)
502 b.err = err
503 }
504 }()
505
506 var n int
507 buf := b.buf[0:b.size]
508 for n < b.size {
509 n2, err := rd.Read(buf)
510 n += n2
511 if err != nil {
512 b.err = err
513 break
514 }
515 buf = buf[n2:]
516 }
517 b.buf = b.buf[0:n]
518 b.offset = 0
519 return b.err
520 }
521
522
523 func (b *buffer) buffer() []byte {
524 return b.buf[b.offset:]
525 }
526
527
528 func (b *buffer) inc(n int) {
529 b.offset += n
530 }
531
View as plain text