1
2
3
4
5
6 package s2
7
8 import (
9 "errors"
10 "fmt"
11 "io"
12 "io/ioutil"
13 "math"
14 "runtime"
15 "sync"
16 )
17
18
19 type ErrCantSeek struct {
20 Reason string
21 }
22
23
24 func (e ErrCantSeek) Error() string {
25 return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
26 }
27
28
29
30
31 func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
32 nr := Reader{
33 r: r,
34 maxBlock: maxBlockSize,
35 }
36 for _, opt := range opts {
37 if err := opt(&nr); err != nil {
38 nr.err = err
39 return &nr
40 }
41 }
42 nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
43 if nr.lazyBuf > 0 {
44 nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
45 } else {
46 nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
47 }
48 nr.readHeader = nr.ignoreStreamID
49 nr.paramsOK = true
50 return &nr
51 }
52
53
54 type ReaderOption func(*Reader) error
55
56
57
58
59
60
61
62
63
64 func ReaderMaxBlockSize(blockSize int) ReaderOption {
65 return func(r *Reader) error {
66 if blockSize > maxBlockSize || blockSize <= 0 {
67 return errors.New("s2: block size too large. Must be <= 4MB and > 0")
68 }
69 if r.lazyBuf == 0 && blockSize < defaultBlockSize {
70 r.lazyBuf = blockSize
71 }
72 r.maxBlock = blockSize
73 return nil
74 }
75 }
76
77
78
79
80
81
82 func ReaderAllocBlock(blockSize int) ReaderOption {
83 return func(r *Reader) error {
84 if blockSize > maxBlockSize || blockSize < 1024 {
85 return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
86 }
87 r.lazyBuf = blockSize
88 return nil
89 }
90 }
91
92
93
94
95 func ReaderIgnoreStreamIdentifier() ReaderOption {
96 return func(r *Reader) error {
97 r.ignoreStreamID = true
98 return nil
99 }
100 }
101
102
103
104
105
106
107
108
109 func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
110 return func(r *Reader) error {
111 if id < 0x80 || id > 0xfd {
112 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
113 }
114 r.skippableCB[id-0x80] = fn
115 return nil
116 }
117 }
118
119
120 func ReaderIgnoreCRC() ReaderOption {
121 return func(r *Reader) error {
122 r.ignoreCRC = true
123 return nil
124 }
125 }
126
127
128 type Reader struct {
129 r io.Reader
130 err error
131 decoded []byte
132 buf []byte
133 skippableCB [0xff - 0x80]func(r io.Reader) error
134 blockStart int64
135 index *Index
136
137
138 i, j int
139
140 maxBlock int
141
142 maxBufSize int
143
144 lazyBuf int
145 readHeader bool
146 paramsOK bool
147 snappyFrame bool
148 ignoreStreamID bool
149 ignoreCRC bool
150 }
151
152
153
154
155 func (r *Reader) GetBufferCapacity() int {
156 return cap(r.buf)
157 }
158
159
160
161 func (r *Reader) ensureBufferSize(n int) bool {
162 if n > r.maxBufSize {
163 r.err = ErrCorrupt
164 return false
165 }
166 if cap(r.buf) >= n {
167 return true
168 }
169
170 r.buf = make([]byte, n)
171 return true
172 }
173
174
175
176
177 func (r *Reader) Reset(reader io.Reader) {
178 if !r.paramsOK {
179 return
180 }
181 r.index = nil
182 r.r = reader
183 r.err = nil
184 r.i = 0
185 r.j = 0
186 r.blockStart = 0
187 r.readHeader = r.ignoreStreamID
188 }
189
190 func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
191 if _, r.err = io.ReadFull(r.r, p); r.err != nil {
192 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
193 r.err = ErrCorrupt
194 }
195 return false
196 }
197 return true
198 }
199
200
201
202
203
204 func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
205 if id < 0x80 {
206 r.err = fmt.Errorf("internal error: skippable id < 0x80")
207 return false
208 }
209 if fn := r.skippableCB[id-0x80]; fn != nil {
210 rd := io.LimitReader(r.r, int64(n))
211 r.err = fn(rd)
212 if r.err != nil {
213 return false
214 }
215 _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
216 return r.err == nil
217 }
218 if rs, ok := r.r.(io.ReadSeeker); ok {
219 _, err := rs.Seek(int64(n), io.SeekCurrent)
220 if err == nil {
221 return true
222 }
223 if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
224 r.err = ErrCorrupt
225 return false
226 }
227 }
228 for n > 0 {
229 if n < len(tmp) {
230 tmp = tmp[:n]
231 }
232 if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
233 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
234 r.err = ErrCorrupt
235 }
236 return false
237 }
238 n -= len(tmp)
239 }
240 return true
241 }
242
243
244 func (r *Reader) Read(p []byte) (int, error) {
245 if r.err != nil {
246 return 0, r.err
247 }
248 for {
249 if r.i < r.j {
250 n := copy(p, r.decoded[r.i:r.j])
251 r.i += n
252 return n, nil
253 }
254 if !r.readFull(r.buf[:4], true) {
255 return 0, r.err
256 }
257 chunkType := r.buf[0]
258 if !r.readHeader {
259 if chunkType != chunkTypeStreamIdentifier {
260 r.err = ErrCorrupt
261 return 0, r.err
262 }
263 r.readHeader = true
264 }
265 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
266
267
268
269 switch chunkType {
270 case chunkTypeCompressedData:
271 r.blockStart += int64(r.j)
272
273 if chunkLen < checksumSize {
274 r.err = ErrCorrupt
275 return 0, r.err
276 }
277 if !r.ensureBufferSize(chunkLen) {
278 if r.err == nil {
279 r.err = ErrUnsupported
280 }
281 return 0, r.err
282 }
283 buf := r.buf[:chunkLen]
284 if !r.readFull(buf, false) {
285 return 0, r.err
286 }
287 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
288 buf = buf[checksumSize:]
289
290 n, err := DecodedLen(buf)
291 if err != nil {
292 r.err = err
293 return 0, r.err
294 }
295 if r.snappyFrame && n > maxSnappyBlockSize {
296 r.err = ErrCorrupt
297 return 0, r.err
298 }
299
300 if n > len(r.decoded) {
301 if n > r.maxBlock {
302 r.err = ErrCorrupt
303 return 0, r.err
304 }
305 r.decoded = make([]byte, n)
306 }
307 if _, err := Decode(r.decoded, buf); err != nil {
308 r.err = err
309 return 0, r.err
310 }
311 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
312 r.err = ErrCRC
313 return 0, r.err
314 }
315 r.i, r.j = 0, n
316 continue
317
318 case chunkTypeUncompressedData:
319 r.blockStart += int64(r.j)
320
321 if chunkLen < checksumSize {
322 r.err = ErrCorrupt
323 return 0, r.err
324 }
325 if !r.ensureBufferSize(chunkLen) {
326 if r.err == nil {
327 r.err = ErrUnsupported
328 }
329 return 0, r.err
330 }
331 buf := r.buf[:checksumSize]
332 if !r.readFull(buf, false) {
333 return 0, r.err
334 }
335 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
336
337 n := chunkLen - checksumSize
338 if r.snappyFrame && n > maxSnappyBlockSize {
339 r.err = ErrCorrupt
340 return 0, r.err
341 }
342 if n > len(r.decoded) {
343 if n > r.maxBlock {
344 r.err = ErrCorrupt
345 return 0, r.err
346 }
347 r.decoded = make([]byte, n)
348 }
349 if !r.readFull(r.decoded[:n], false) {
350 return 0, r.err
351 }
352 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
353 r.err = ErrCRC
354 return 0, r.err
355 }
356 r.i, r.j = 0, n
357 continue
358
359 case chunkTypeStreamIdentifier:
360
361 if chunkLen != len(magicBody) {
362 r.err = ErrCorrupt
363 return 0, r.err
364 }
365 if !r.readFull(r.buf[:len(magicBody)], false) {
366 return 0, r.err
367 }
368 if string(r.buf[:len(magicBody)]) != magicBody {
369 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
370 r.err = ErrCorrupt
371 return 0, r.err
372 } else {
373 r.snappyFrame = true
374 }
375 } else {
376 r.snappyFrame = false
377 }
378 continue
379 }
380
381 if chunkType <= 0x7f {
382
383
384 r.err = ErrUnsupported
385 return 0, r.err
386 }
387
388
389 if chunkLen > maxChunkSize {
390
391 r.err = ErrUnsupported
392 return 0, r.err
393 }
394
395
396 if !r.skippable(r.buf, chunkLen, false, chunkType) {
397 return 0, r.err
398 }
399 }
400 }
401
402
403
404
405
406
407
408 func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
409 if r.i > 0 || r.j > 0 || r.blockStart > 0 {
410 return 0, errors.New("DecodeConcurrent called after ")
411 }
412 if concurrent <= 0 {
413 concurrent = runtime.NumCPU()
414 }
415
416
417 var errMu sync.Mutex
418 var aErr error
419 setErr := func(e error) (ok bool) {
420 errMu.Lock()
421 defer errMu.Unlock()
422 if e == nil {
423 return aErr == nil
424 }
425 if aErr == nil {
426 aErr = e
427 }
428 return false
429 }
430 hasErr := func() (ok bool) {
431 errMu.Lock()
432 v := aErr != nil
433 errMu.Unlock()
434 return v
435 }
436
437 var aWritten int64
438 toRead := make(chan []byte, concurrent)
439 writtenBlocks := make(chan []byte, concurrent)
440 queue := make(chan chan []byte, concurrent)
441 reUse := make(chan chan []byte, concurrent)
442 for i := 0; i < concurrent; i++ {
443 toRead <- make([]byte, 0, r.maxBufSize)
444 writtenBlocks <- make([]byte, 0, r.maxBufSize)
445 reUse <- make(chan []byte, 1)
446 }
447
448 var wg sync.WaitGroup
449 wg.Add(1)
450 go func() {
451 defer wg.Done()
452 for toWrite := range queue {
453 entry := <-toWrite
454 reUse <- toWrite
455 if hasErr() || entry == nil {
456 if entry != nil {
457 writtenBlocks <- entry
458 }
459 continue
460 }
461 if hasErr() {
462 writtenBlocks <- entry
463 continue
464 }
465 n, err := w.Write(entry)
466 want := len(entry)
467 writtenBlocks <- entry
468 if err != nil {
469 setErr(err)
470 continue
471 }
472 if n != want {
473 setErr(io.ErrShortWrite)
474 continue
475 }
476 aWritten += int64(n)
477 }
478 }()
479
480 defer func() {
481 if r.err != nil {
482 setErr(r.err)
483 } else if err != nil {
484 setErr(err)
485 }
486 close(queue)
487 wg.Wait()
488 if err == nil {
489 err = aErr
490 }
491 written = aWritten
492 }()
493
494
495 for !hasErr() {
496 if !r.readFull(r.buf[:4], true) {
497 if r.err == io.EOF {
498 r.err = nil
499 }
500 return 0, r.err
501 }
502 chunkType := r.buf[0]
503 if !r.readHeader {
504 if chunkType != chunkTypeStreamIdentifier {
505 r.err = ErrCorrupt
506 return 0, r.err
507 }
508 r.readHeader = true
509 }
510 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
511
512
513
514 switch chunkType {
515 case chunkTypeCompressedData:
516 r.blockStart += int64(r.j)
517
518 if chunkLen < checksumSize {
519 r.err = ErrCorrupt
520 return 0, r.err
521 }
522 if chunkLen > r.maxBufSize {
523 r.err = ErrCorrupt
524 return 0, r.err
525 }
526 orgBuf := <-toRead
527 buf := orgBuf[:chunkLen]
528
529 if !r.readFull(buf, false) {
530 return 0, r.err
531 }
532
533 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
534 buf = buf[checksumSize:]
535
536 n, err := DecodedLen(buf)
537 if err != nil {
538 r.err = err
539 return 0, r.err
540 }
541 if r.snappyFrame && n > maxSnappyBlockSize {
542 r.err = ErrCorrupt
543 return 0, r.err
544 }
545
546 if n > r.maxBlock {
547 r.err = ErrCorrupt
548 return 0, r.err
549 }
550 wg.Add(1)
551
552 decoded := <-writtenBlocks
553 entry := <-reUse
554 queue <- entry
555 go func() {
556 defer wg.Done()
557 decoded = decoded[:n]
558 _, err := Decode(decoded, buf)
559 toRead <- orgBuf
560 if err != nil {
561 writtenBlocks <- decoded
562 setErr(err)
563 entry <- nil
564 return
565 }
566 if !r.ignoreCRC && crc(decoded) != checksum {
567 writtenBlocks <- decoded
568 setErr(ErrCRC)
569 entry <- nil
570 return
571 }
572 entry <- decoded
573 }()
574 continue
575
576 case chunkTypeUncompressedData:
577
578
579 if chunkLen < checksumSize {
580 r.err = ErrCorrupt
581 return 0, r.err
582 }
583 if chunkLen > r.maxBufSize {
584 r.err = ErrCorrupt
585 return 0, r.err
586 }
587
588 orgBuf := <-writtenBlocks
589 buf := orgBuf[:checksumSize]
590 if !r.readFull(buf, false) {
591 return 0, r.err
592 }
593 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
594
595 n := chunkLen - checksumSize
596
597 if r.snappyFrame && n > maxSnappyBlockSize {
598 r.err = ErrCorrupt
599 return 0, r.err
600 }
601 if n > r.maxBlock {
602 r.err = ErrCorrupt
603 return 0, r.err
604 }
605
606 buf = orgBuf[:n]
607 if !r.readFull(buf, false) {
608 return 0, r.err
609 }
610
611 if !r.ignoreCRC && crc(buf) != checksum {
612 r.err = ErrCRC
613 return 0, r.err
614 }
615 entry := <-reUse
616 queue <- entry
617 entry <- buf
618 continue
619
620 case chunkTypeStreamIdentifier:
621
622 if chunkLen != len(magicBody) {
623 r.err = ErrCorrupt
624 return 0, r.err
625 }
626 if !r.readFull(r.buf[:len(magicBody)], false) {
627 return 0, r.err
628 }
629 if string(r.buf[:len(magicBody)]) != magicBody {
630 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
631 r.err = ErrCorrupt
632 return 0, r.err
633 } else {
634 r.snappyFrame = true
635 }
636 } else {
637 r.snappyFrame = false
638 }
639 continue
640 }
641
642 if chunkType <= 0x7f {
643
644
645 r.err = ErrUnsupported
646 return 0, r.err
647 }
648
649
650 if chunkLen > maxChunkSize {
651
652 r.err = ErrUnsupported
653 return 0, r.err
654 }
655
656
657 if !r.skippable(r.buf, chunkLen, false, chunkType) {
658 return 0, r.err
659 }
660 }
661 return 0, r.err
662 }
663
664
665
666
667
668
669 func (r *Reader) Skip(n int64) error {
670 if n < 0 {
671 return errors.New("attempted negative skip")
672 }
673 if r.err != nil {
674 return r.err
675 }
676
677 for n > 0 {
678 if r.i < r.j {
679
680
681 left := int64(r.j - r.i)
682 if left >= n {
683 tmp := int64(r.i) + n
684 if tmp > math.MaxInt32 {
685 return errors.New("s2: internal overflow in skip")
686 }
687 r.i = int(tmp)
688 return nil
689 }
690 n -= int64(r.j - r.i)
691 r.i = r.j
692 }
693
694
695 if !r.readFull(r.buf[:4], true) {
696 if r.err == io.EOF {
697 r.err = io.ErrUnexpectedEOF
698 }
699 return r.err
700 }
701 chunkType := r.buf[0]
702 if !r.readHeader {
703 if chunkType != chunkTypeStreamIdentifier {
704 r.err = ErrCorrupt
705 return r.err
706 }
707 r.readHeader = true
708 }
709 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
710
711
712
713 switch chunkType {
714 case chunkTypeCompressedData:
715 r.blockStart += int64(r.j)
716
717 if chunkLen < checksumSize {
718 r.err = ErrCorrupt
719 return r.err
720 }
721 if !r.ensureBufferSize(chunkLen) {
722 if r.err == nil {
723 r.err = ErrUnsupported
724 }
725 return r.err
726 }
727 buf := r.buf[:chunkLen]
728 if !r.readFull(buf, false) {
729 return r.err
730 }
731 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
732 buf = buf[checksumSize:]
733
734 dLen, err := DecodedLen(buf)
735 if err != nil {
736 r.err = err
737 return r.err
738 }
739 if dLen > r.maxBlock {
740 r.err = ErrCorrupt
741 return r.err
742 }
743
744 if int64(dLen) > n {
745 if len(r.decoded) < dLen {
746 r.decoded = make([]byte, dLen)
747 }
748 if _, err := Decode(r.decoded, buf); err != nil {
749 r.err = err
750 return r.err
751 }
752 if crc(r.decoded[:dLen]) != checksum {
753 r.err = ErrCorrupt
754 return r.err
755 }
756 } else {
757
758 n -= int64(dLen)
759 r.blockStart += int64(dLen)
760 dLen = 0
761 }
762 r.i, r.j = 0, dLen
763 continue
764 case chunkTypeUncompressedData:
765 r.blockStart += int64(r.j)
766
767 if chunkLen < checksumSize {
768 r.err = ErrCorrupt
769 return r.err
770 }
771 if !r.ensureBufferSize(chunkLen) {
772 if r.err != nil {
773 r.err = ErrUnsupported
774 }
775 return r.err
776 }
777 buf := r.buf[:checksumSize]
778 if !r.readFull(buf, false) {
779 return r.err
780 }
781 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
782
783 n2 := chunkLen - checksumSize
784 if n2 > len(r.decoded) {
785 if n2 > r.maxBlock {
786 r.err = ErrCorrupt
787 return r.err
788 }
789 r.decoded = make([]byte, n2)
790 }
791 if !r.readFull(r.decoded[:n2], false) {
792 return r.err
793 }
794 if int64(n2) < n {
795 if crc(r.decoded[:n2]) != checksum {
796 r.err = ErrCorrupt
797 return r.err
798 }
799 }
800 r.i, r.j = 0, n2
801 continue
802 case chunkTypeStreamIdentifier:
803
804 if chunkLen != len(magicBody) {
805 r.err = ErrCorrupt
806 return r.err
807 }
808 if !r.readFull(r.buf[:len(magicBody)], false) {
809 return r.err
810 }
811 if string(r.buf[:len(magicBody)]) != magicBody {
812 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
813 r.err = ErrCorrupt
814 return r.err
815 }
816 }
817
818 continue
819 }
820
821 if chunkType <= 0x7f {
822
823 r.err = ErrUnsupported
824 return r.err
825 }
826 if chunkLen > maxChunkSize {
827 r.err = ErrUnsupported
828 return r.err
829 }
830
831
832 if !r.skippable(r.buf, chunkLen, false, chunkType) {
833 return r.err
834 }
835 }
836 return nil
837 }
838
839
840
841 type ReadSeeker struct {
842 *Reader
843 readAtMu sync.Mutex
844 }
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859 func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
860
861 if len(index) != 0 {
862 if r.index == nil {
863 r.index = &Index{}
864 }
865 if _, err := r.index.Load(index); err != nil {
866 return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
867 }
868 }
869
870
871 rs, ok := r.r.(io.ReadSeeker)
872 if !ok {
873 if !random {
874 return &ReadSeeker{Reader: r}, nil
875 }
876 return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
877 }
878
879 if r.index != nil {
880
881 return &ReadSeeker{Reader: r}, nil
882 }
883
884
885 r.index = &Index{}
886
887
888 pos, err := rs.Seek(0, io.SeekCurrent)
889 if err != nil {
890 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
891 }
892 err = r.index.LoadStream(rs)
893 if err != nil {
894 if err == ErrUnsupported {
895
896 if !random {
897 _, err = rs.Seek(pos, io.SeekStart)
898 if err != nil {
899 return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
900 }
901 r.index = nil
902 return &ReadSeeker{Reader: r}, nil
903 }
904 return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
905 }
906 return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
907 }
908
909
910 _, err = rs.Seek(pos, io.SeekStart)
911 if err != nil {
912 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
913 }
914 return &ReadSeeker{Reader: r}, nil
915 }
916
917
918 func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
919 if r.err != nil {
920 if !errors.Is(r.err, io.EOF) {
921 return 0, r.err
922 }
923
924 r.err = nil
925 }
926
927
928 absOffset := offset
929
930 switch whence {
931 case io.SeekStart:
932 case io.SeekCurrent:
933 absOffset = r.blockStart + int64(r.i) + offset
934 case io.SeekEnd:
935 if r.index == nil {
936 return 0, ErrUnsupported
937 }
938 absOffset = r.index.TotalUncompressed + offset
939 default:
940 r.err = ErrUnsupported
941 return 0, r.err
942 }
943
944 if absOffset < 0 {
945 return 0, errors.New("seek before start of file")
946 }
947
948 if !r.readHeader {
949
950 _, r.err = r.Read([]byte{})
951 if r.err != nil {
952 return 0, r.err
953 }
954 }
955
956
957
958 if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
959 r.i = int(absOffset - r.blockStart)
960 return r.blockStart + int64(r.i), nil
961 }
962
963 rs, ok := r.r.(io.ReadSeeker)
964 if r.index == nil || !ok {
965 currOffset := r.blockStart + int64(r.i)
966 if absOffset >= currOffset {
967 err := r.Skip(absOffset - currOffset)
968 return r.blockStart + int64(r.i), err
969 }
970 return 0, ErrUnsupported
971 }
972
973
974 c, u, err := r.index.Find(absOffset)
975 if err != nil {
976 return r.blockStart + int64(r.i), err
977 }
978
979
980 _, err = rs.Seek(c, io.SeekStart)
981 if err != nil {
982 return 0, err
983 }
984
985 r.i = r.j
986 r.blockStart = u - int64(r.j)
987 if u < absOffset {
988
989 return absOffset, r.Skip(absOffset - u)
990 }
991 if u > absOffset {
992 return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
993 }
994 return absOffset, nil
995 }
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019 func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
1020 r.readAtMu.Lock()
1021 defer r.readAtMu.Unlock()
1022 _, err := r.Seek(offset, io.SeekStart)
1023 if err != nil {
1024 return 0, err
1025 }
1026 n := 0
1027 for n < len(p) {
1028 n2, err := r.Read(p[n:])
1029 if err != nil {
1030
1031 return n + n2, err
1032 }
1033 n += n2
1034 }
1035 return n, nil
1036 }
1037
1038
1039 func (r *Reader) ReadByte() (byte, error) {
1040 if r.err != nil {
1041 return 0, r.err
1042 }
1043 if r.i < r.j {
1044 c := r.decoded[r.i]
1045 r.i++
1046 return c, nil
1047 }
1048 var tmp [1]byte
1049 for i := 0; i < 10; i++ {
1050 n, err := r.Read(tmp[:])
1051 if err != nil {
1052 return 0, err
1053 }
1054 if n == 1 {
1055 return tmp[0], nil
1056 }
1057 }
1058 return 0, io.ErrNoProgress
1059 }
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069 func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
1070 if id < 0x80 || id >= chunkTypePadding {
1071 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
1072 }
1073 r.skippableCB[id-0x80] = fn
1074 return nil
1075 }
1076
View as plain text