...

Source file src/github.com/klauspost/compress/s2/reader.go

Documentation: github.com/klauspost/compress/s2

     1  // Copyright 2011 The Snappy-Go Authors. All rights reserved.
     2  // Copyright (c) 2019+ Klaus Post. All rights reserved.
     3  // Use of this source code is governed by a BSD-style
     4  // license that can be found in the LICENSE file.
     5  
     6  package s2
     7  
     8  import (
     9  	"errors"
    10  	"fmt"
    11  	"io"
    12  	"io/ioutil"
    13  	"math"
    14  	"runtime"
    15  	"sync"
    16  )
    17  
    18  // ErrCantSeek is returned if the stream cannot be seeked.
    19  type ErrCantSeek struct {
    20  	Reason string
    21  }
    22  
    23  // Error returns the error as string.
    24  func (e ErrCantSeek) Error() string {
    25  	return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
    26  }
    27  
    28  // NewReader returns a new Reader that decompresses from r, using the framing
    29  // format described at
    30  // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
    31  func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
    32  	nr := Reader{
    33  		r:        r,
    34  		maxBlock: maxBlockSize,
    35  	}
    36  	for _, opt := range opts {
    37  		if err := opt(&nr); err != nil {
    38  			nr.err = err
    39  			return &nr
    40  		}
    41  	}
    42  	nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
    43  	if nr.lazyBuf > 0 {
    44  		nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
    45  	} else {
    46  		nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
    47  	}
    48  	nr.readHeader = nr.ignoreStreamID
    49  	nr.paramsOK = true
    50  	return &nr
    51  }
    52  
    53  // ReaderOption is an option for creating a decoder.
    54  type ReaderOption func(*Reader) error
    55  
    56  // ReaderMaxBlockSize allows to control allocations if the stream
    57  // has been compressed with a smaller WriterBlockSize, or with the default 1MB.
    58  // Blocks must be this size or smaller to decompress,
    59  // otherwise the decoder will return ErrUnsupported.
    60  //
    61  // For streams compressed with Snappy this can safely be set to 64KB (64 << 10).
    62  //
    63  // Default is the maximum limit of 4MB.
    64  func ReaderMaxBlockSize(blockSize int) ReaderOption {
    65  	return func(r *Reader) error {
    66  		if blockSize > maxBlockSize || blockSize <= 0 {
    67  			return errors.New("s2: block size too large. Must be <= 4MB and > 0")
    68  		}
    69  		if r.lazyBuf == 0 && blockSize < defaultBlockSize {
    70  			r.lazyBuf = blockSize
    71  		}
    72  		r.maxBlock = blockSize
    73  		return nil
    74  	}
    75  }
    76  
    77  // ReaderAllocBlock allows to control upfront stream allocations
    78  // and not allocate for frames bigger than this initially.
    79  // If frames bigger than this is seen a bigger buffer will be allocated.
    80  //
    81  // Default is 1MB, which is default output size.
    82  func ReaderAllocBlock(blockSize int) ReaderOption {
    83  	return func(r *Reader) error {
    84  		if blockSize > maxBlockSize || blockSize < 1024 {
    85  			return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
    86  		}
    87  		r.lazyBuf = blockSize
    88  		return nil
    89  	}
    90  }
    91  
    92  // ReaderIgnoreStreamIdentifier will make the reader skip the expected
    93  // stream identifier at the beginning of the stream.
    94  // This can be used when serving a stream that has been forwarded to a specific point.
    95  func ReaderIgnoreStreamIdentifier() ReaderOption {
    96  	return func(r *Reader) error {
    97  		r.ignoreStreamID = true
    98  		return nil
    99  	}
   100  }
   101  
   102  // ReaderSkippableCB will register a callback for chuncks with the specified ID.
   103  // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
   104  // For each chunk with the ID, the callback is called with the content.
   105  // Any returned non-nil error will abort decompression.
   106  // Only one callback per ID is supported, latest sent will be used.
   107  // You can peek the stream, triggering the callback, by doing a Read with a 0
   108  // byte buffer.
   109  func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
   110  	return func(r *Reader) error {
   111  		if id < 0x80 || id > 0xfd {
   112  			return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
   113  		}
   114  		r.skippableCB[id-0x80] = fn
   115  		return nil
   116  	}
   117  }
   118  
   119  // ReaderIgnoreCRC will make the reader skip CRC calculation and checks.
   120  func ReaderIgnoreCRC() ReaderOption {
   121  	return func(r *Reader) error {
   122  		r.ignoreCRC = true
   123  		return nil
   124  	}
   125  }
   126  
   127  // Reader is an io.Reader that can read Snappy-compressed bytes.
   128  type Reader struct {
   129  	r           io.Reader
   130  	err         error
   131  	decoded     []byte
   132  	buf         []byte
   133  	skippableCB [0xff - 0x80]func(r io.Reader) error
   134  	blockStart  int64 // Uncompressed offset at start of current.
   135  	index       *Index
   136  
   137  	// decoded[i:j] contains decoded bytes that have not yet been passed on.
   138  	i, j int
   139  	// maximum block size allowed.
   140  	maxBlock int
   141  	// maximum expected buffer size.
   142  	maxBufSize int
   143  	// alloc a buffer this size if > 0.
   144  	lazyBuf        int
   145  	readHeader     bool
   146  	paramsOK       bool
   147  	snappyFrame    bool
   148  	ignoreStreamID bool
   149  	ignoreCRC      bool
   150  }
   151  
   152  // GetBufferCapacity returns the capacity of the internal buffer.
   153  // This might be useful to know when reusing the same reader in combination
   154  // with the lazy buffer option.
   155  func (r *Reader) GetBufferCapacity() int {
   156  	return cap(r.buf)
   157  }
   158  
   159  // ensureBufferSize will ensure that the buffer can take at least n bytes.
   160  // If false is returned the buffer exceeds maximum allowed size.
   161  func (r *Reader) ensureBufferSize(n int) bool {
   162  	if n > r.maxBufSize {
   163  		r.err = ErrCorrupt
   164  		return false
   165  	}
   166  	if cap(r.buf) >= n {
   167  		return true
   168  	}
   169  	// Realloc buffer.
   170  	r.buf = make([]byte, n)
   171  	return true
   172  }
   173  
   174  // Reset discards any buffered data, resets all state, and switches the Snappy
   175  // reader to read from r. This permits reusing a Reader rather than allocating
   176  // a new one.
   177  func (r *Reader) Reset(reader io.Reader) {
   178  	if !r.paramsOK {
   179  		return
   180  	}
   181  	r.index = nil
   182  	r.r = reader
   183  	r.err = nil
   184  	r.i = 0
   185  	r.j = 0
   186  	r.blockStart = 0
   187  	r.readHeader = r.ignoreStreamID
   188  }
   189  
   190  func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
   191  	if _, r.err = io.ReadFull(r.r, p); r.err != nil {
   192  		if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
   193  			r.err = ErrCorrupt
   194  		}
   195  		return false
   196  	}
   197  	return true
   198  }
   199  
   200  // skippable will skip n bytes.
   201  // If the supplied reader supports seeking that is used.
   202  // tmp is used as a temporary buffer for reading.
   203  // The supplied slice does not need to be the size of the read.
   204  func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
   205  	if id < 0x80 {
   206  		r.err = fmt.Errorf("internal error: skippable id < 0x80")
   207  		return false
   208  	}
   209  	if fn := r.skippableCB[id-0x80]; fn != nil {
   210  		rd := io.LimitReader(r.r, int64(n))
   211  		r.err = fn(rd)
   212  		if r.err != nil {
   213  			return false
   214  		}
   215  		_, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
   216  		return r.err == nil
   217  	}
   218  	if rs, ok := r.r.(io.ReadSeeker); ok {
   219  		_, err := rs.Seek(int64(n), io.SeekCurrent)
   220  		if err == nil {
   221  			return true
   222  		}
   223  		if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
   224  			r.err = ErrCorrupt
   225  			return false
   226  		}
   227  	}
   228  	for n > 0 {
   229  		if n < len(tmp) {
   230  			tmp = tmp[:n]
   231  		}
   232  		if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
   233  			if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
   234  				r.err = ErrCorrupt
   235  			}
   236  			return false
   237  		}
   238  		n -= len(tmp)
   239  	}
   240  	return true
   241  }
   242  
   243  // Read satisfies the io.Reader interface.
   244  func (r *Reader) Read(p []byte) (int, error) {
   245  	if r.err != nil {
   246  		return 0, r.err
   247  	}
   248  	for {
   249  		if r.i < r.j {
   250  			n := copy(p, r.decoded[r.i:r.j])
   251  			r.i += n
   252  			return n, nil
   253  		}
   254  		if !r.readFull(r.buf[:4], true) {
   255  			return 0, r.err
   256  		}
   257  		chunkType := r.buf[0]
   258  		if !r.readHeader {
   259  			if chunkType != chunkTypeStreamIdentifier {
   260  				r.err = ErrCorrupt
   261  				return 0, r.err
   262  			}
   263  			r.readHeader = true
   264  		}
   265  		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
   266  
   267  		// The chunk types are specified at
   268  		// https://github.com/google/snappy/blob/master/framing_format.txt
   269  		switch chunkType {
   270  		case chunkTypeCompressedData:
   271  			r.blockStart += int64(r.j)
   272  			// Section 4.2. Compressed data (chunk type 0x00).
   273  			if chunkLen < checksumSize {
   274  				r.err = ErrCorrupt
   275  				return 0, r.err
   276  			}
   277  			if !r.ensureBufferSize(chunkLen) {
   278  				if r.err == nil {
   279  					r.err = ErrUnsupported
   280  				}
   281  				return 0, r.err
   282  			}
   283  			buf := r.buf[:chunkLen]
   284  			if !r.readFull(buf, false) {
   285  				return 0, r.err
   286  			}
   287  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
   288  			buf = buf[checksumSize:]
   289  
   290  			n, err := DecodedLen(buf)
   291  			if err != nil {
   292  				r.err = err
   293  				return 0, r.err
   294  			}
   295  			if r.snappyFrame && n > maxSnappyBlockSize {
   296  				r.err = ErrCorrupt
   297  				return 0, r.err
   298  			}
   299  
   300  			if n > len(r.decoded) {
   301  				if n > r.maxBlock {
   302  					r.err = ErrCorrupt
   303  					return 0, r.err
   304  				}
   305  				r.decoded = make([]byte, n)
   306  			}
   307  			if _, err := Decode(r.decoded, buf); err != nil {
   308  				r.err = err
   309  				return 0, r.err
   310  			}
   311  			if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
   312  				r.err = ErrCRC
   313  				return 0, r.err
   314  			}
   315  			r.i, r.j = 0, n
   316  			continue
   317  
   318  		case chunkTypeUncompressedData:
   319  			r.blockStart += int64(r.j)
   320  			// Section 4.3. Uncompressed data (chunk type 0x01).
   321  			if chunkLen < checksumSize {
   322  				r.err = ErrCorrupt
   323  				return 0, r.err
   324  			}
   325  			if !r.ensureBufferSize(chunkLen) {
   326  				if r.err == nil {
   327  					r.err = ErrUnsupported
   328  				}
   329  				return 0, r.err
   330  			}
   331  			buf := r.buf[:checksumSize]
   332  			if !r.readFull(buf, false) {
   333  				return 0, r.err
   334  			}
   335  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
   336  			// Read directly into r.decoded instead of via r.buf.
   337  			n := chunkLen - checksumSize
   338  			if r.snappyFrame && n > maxSnappyBlockSize {
   339  				r.err = ErrCorrupt
   340  				return 0, r.err
   341  			}
   342  			if n > len(r.decoded) {
   343  				if n > r.maxBlock {
   344  					r.err = ErrCorrupt
   345  					return 0, r.err
   346  				}
   347  				r.decoded = make([]byte, n)
   348  			}
   349  			if !r.readFull(r.decoded[:n], false) {
   350  				return 0, r.err
   351  			}
   352  			if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
   353  				r.err = ErrCRC
   354  				return 0, r.err
   355  			}
   356  			r.i, r.j = 0, n
   357  			continue
   358  
   359  		case chunkTypeStreamIdentifier:
   360  			// Section 4.1. Stream identifier (chunk type 0xff).
   361  			if chunkLen != len(magicBody) {
   362  				r.err = ErrCorrupt
   363  				return 0, r.err
   364  			}
   365  			if !r.readFull(r.buf[:len(magicBody)], false) {
   366  				return 0, r.err
   367  			}
   368  			if string(r.buf[:len(magicBody)]) != magicBody {
   369  				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
   370  					r.err = ErrCorrupt
   371  					return 0, r.err
   372  				} else {
   373  					r.snappyFrame = true
   374  				}
   375  			} else {
   376  				r.snappyFrame = false
   377  			}
   378  			continue
   379  		}
   380  
   381  		if chunkType <= 0x7f {
   382  			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
   383  			// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
   384  			r.err = ErrUnsupported
   385  			return 0, r.err
   386  		}
   387  		// Section 4.4 Padding (chunk type 0xfe).
   388  		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
   389  		if chunkLen > maxChunkSize {
   390  			// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
   391  			r.err = ErrUnsupported
   392  			return 0, r.err
   393  		}
   394  
   395  		// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
   396  		if !r.skippable(r.buf, chunkLen, false, chunkType) {
   397  			return 0, r.err
   398  		}
   399  	}
   400  }
   401  
   402  // DecodeConcurrent will decode the full stream to w.
   403  // This function should not be combined with reading, seeking or other operations.
   404  // Up to 'concurrent' goroutines will be used.
   405  // If <= 0, runtime.NumCPU will be used.
   406  // On success the number of bytes decompressed nil and is returned.
   407  // This is mainly intended for bigger streams.
   408  func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
   409  	if r.i > 0 || r.j > 0 || r.blockStart > 0 {
   410  		return 0, errors.New("DecodeConcurrent called after ")
   411  	}
   412  	if concurrent <= 0 {
   413  		concurrent = runtime.NumCPU()
   414  	}
   415  
   416  	// Write to output
   417  	var errMu sync.Mutex
   418  	var aErr error
   419  	setErr := func(e error) (ok bool) {
   420  		errMu.Lock()
   421  		defer errMu.Unlock()
   422  		if e == nil {
   423  			return aErr == nil
   424  		}
   425  		if aErr == nil {
   426  			aErr = e
   427  		}
   428  		return false
   429  	}
   430  	hasErr := func() (ok bool) {
   431  		errMu.Lock()
   432  		v := aErr != nil
   433  		errMu.Unlock()
   434  		return v
   435  	}
   436  
   437  	var aWritten int64
   438  	toRead := make(chan []byte, concurrent)
   439  	writtenBlocks := make(chan []byte, concurrent)
   440  	queue := make(chan chan []byte, concurrent)
   441  	reUse := make(chan chan []byte, concurrent)
   442  	for i := 0; i < concurrent; i++ {
   443  		toRead <- make([]byte, 0, r.maxBufSize)
   444  		writtenBlocks <- make([]byte, 0, r.maxBufSize)
   445  		reUse <- make(chan []byte, 1)
   446  	}
   447  	// Writer
   448  	var wg sync.WaitGroup
   449  	wg.Add(1)
   450  	go func() {
   451  		defer wg.Done()
   452  		for toWrite := range queue {
   453  			entry := <-toWrite
   454  			reUse <- toWrite
   455  			if hasErr() || entry == nil {
   456  				if entry != nil {
   457  					writtenBlocks <- entry
   458  				}
   459  				continue
   460  			}
   461  			if hasErr() {
   462  				writtenBlocks <- entry
   463  				continue
   464  			}
   465  			n, err := w.Write(entry)
   466  			want := len(entry)
   467  			writtenBlocks <- entry
   468  			if err != nil {
   469  				setErr(err)
   470  				continue
   471  			}
   472  			if n != want {
   473  				setErr(io.ErrShortWrite)
   474  				continue
   475  			}
   476  			aWritten += int64(n)
   477  		}
   478  	}()
   479  
   480  	defer func() {
   481  		if r.err != nil {
   482  			setErr(r.err)
   483  		} else if err != nil {
   484  			setErr(err)
   485  		}
   486  		close(queue)
   487  		wg.Wait()
   488  		if err == nil {
   489  			err = aErr
   490  		}
   491  		written = aWritten
   492  	}()
   493  
   494  	// Reader
   495  	for !hasErr() {
   496  		if !r.readFull(r.buf[:4], true) {
   497  			if r.err == io.EOF {
   498  				r.err = nil
   499  			}
   500  			return 0, r.err
   501  		}
   502  		chunkType := r.buf[0]
   503  		if !r.readHeader {
   504  			if chunkType != chunkTypeStreamIdentifier {
   505  				r.err = ErrCorrupt
   506  				return 0, r.err
   507  			}
   508  			r.readHeader = true
   509  		}
   510  		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
   511  
   512  		// The chunk types are specified at
   513  		// https://github.com/google/snappy/blob/master/framing_format.txt
   514  		switch chunkType {
   515  		case chunkTypeCompressedData:
   516  			r.blockStart += int64(r.j)
   517  			// Section 4.2. Compressed data (chunk type 0x00).
   518  			if chunkLen < checksumSize {
   519  				r.err = ErrCorrupt
   520  				return 0, r.err
   521  			}
   522  			if chunkLen > r.maxBufSize {
   523  				r.err = ErrCorrupt
   524  				return 0, r.err
   525  			}
   526  			orgBuf := <-toRead
   527  			buf := orgBuf[:chunkLen]
   528  
   529  			if !r.readFull(buf, false) {
   530  				return 0, r.err
   531  			}
   532  
   533  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
   534  			buf = buf[checksumSize:]
   535  
   536  			n, err := DecodedLen(buf)
   537  			if err != nil {
   538  				r.err = err
   539  				return 0, r.err
   540  			}
   541  			if r.snappyFrame && n > maxSnappyBlockSize {
   542  				r.err = ErrCorrupt
   543  				return 0, r.err
   544  			}
   545  
   546  			if n > r.maxBlock {
   547  				r.err = ErrCorrupt
   548  				return 0, r.err
   549  			}
   550  			wg.Add(1)
   551  
   552  			decoded := <-writtenBlocks
   553  			entry := <-reUse
   554  			queue <- entry
   555  			go func() {
   556  				defer wg.Done()
   557  				decoded = decoded[:n]
   558  				_, err := Decode(decoded, buf)
   559  				toRead <- orgBuf
   560  				if err != nil {
   561  					writtenBlocks <- decoded
   562  					setErr(err)
   563  					entry <- nil
   564  					return
   565  				}
   566  				if !r.ignoreCRC && crc(decoded) != checksum {
   567  					writtenBlocks <- decoded
   568  					setErr(ErrCRC)
   569  					entry <- nil
   570  					return
   571  				}
   572  				entry <- decoded
   573  			}()
   574  			continue
   575  
   576  		case chunkTypeUncompressedData:
   577  
   578  			// Section 4.3. Uncompressed data (chunk type 0x01).
   579  			if chunkLen < checksumSize {
   580  				r.err = ErrCorrupt
   581  				return 0, r.err
   582  			}
   583  			if chunkLen > r.maxBufSize {
   584  				r.err = ErrCorrupt
   585  				return 0, r.err
   586  			}
   587  			// Grab write buffer
   588  			orgBuf := <-writtenBlocks
   589  			buf := orgBuf[:checksumSize]
   590  			if !r.readFull(buf, false) {
   591  				return 0, r.err
   592  			}
   593  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
   594  			// Read content.
   595  			n := chunkLen - checksumSize
   596  
   597  			if r.snappyFrame && n > maxSnappyBlockSize {
   598  				r.err = ErrCorrupt
   599  				return 0, r.err
   600  			}
   601  			if n > r.maxBlock {
   602  				r.err = ErrCorrupt
   603  				return 0, r.err
   604  			}
   605  			// Read uncompressed
   606  			buf = orgBuf[:n]
   607  			if !r.readFull(buf, false) {
   608  				return 0, r.err
   609  			}
   610  
   611  			if !r.ignoreCRC && crc(buf) != checksum {
   612  				r.err = ErrCRC
   613  				return 0, r.err
   614  			}
   615  			entry := <-reUse
   616  			queue <- entry
   617  			entry <- buf
   618  			continue
   619  
   620  		case chunkTypeStreamIdentifier:
   621  			// Section 4.1. Stream identifier (chunk type 0xff).
   622  			if chunkLen != len(magicBody) {
   623  				r.err = ErrCorrupt
   624  				return 0, r.err
   625  			}
   626  			if !r.readFull(r.buf[:len(magicBody)], false) {
   627  				return 0, r.err
   628  			}
   629  			if string(r.buf[:len(magicBody)]) != magicBody {
   630  				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
   631  					r.err = ErrCorrupt
   632  					return 0, r.err
   633  				} else {
   634  					r.snappyFrame = true
   635  				}
   636  			} else {
   637  				r.snappyFrame = false
   638  			}
   639  			continue
   640  		}
   641  
   642  		if chunkType <= 0x7f {
   643  			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
   644  			// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
   645  			r.err = ErrUnsupported
   646  			return 0, r.err
   647  		}
   648  		// Section 4.4 Padding (chunk type 0xfe).
   649  		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
   650  		if chunkLen > maxChunkSize {
   651  			// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
   652  			r.err = ErrUnsupported
   653  			return 0, r.err
   654  		}
   655  
   656  		// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
   657  		if !r.skippable(r.buf, chunkLen, false, chunkType) {
   658  			return 0, r.err
   659  		}
   660  	}
   661  	return 0, r.err
   662  }
   663  
   664  // Skip will skip n bytes forward in the decompressed output.
   665  // For larger skips this consumes less CPU and is faster than reading output and discarding it.
   666  // CRC is not checked on skipped blocks.
   667  // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped.
   668  // If a decoding error is encountered subsequent calls to Read will also fail.
   669  func (r *Reader) Skip(n int64) error {
   670  	if n < 0 {
   671  		return errors.New("attempted negative skip")
   672  	}
   673  	if r.err != nil {
   674  		return r.err
   675  	}
   676  
   677  	for n > 0 {
   678  		if r.i < r.j {
   679  			// Skip in buffer.
   680  			// decoded[i:j] contains decoded bytes that have not yet been passed on.
   681  			left := int64(r.j - r.i)
   682  			if left >= n {
   683  				tmp := int64(r.i) + n
   684  				if tmp > math.MaxInt32 {
   685  					return errors.New("s2: internal overflow in skip")
   686  				}
   687  				r.i = int(tmp)
   688  				return nil
   689  			}
   690  			n -= int64(r.j - r.i)
   691  			r.i = r.j
   692  		}
   693  
   694  		// Buffer empty; read blocks until we have content.
   695  		if !r.readFull(r.buf[:4], true) {
   696  			if r.err == io.EOF {
   697  				r.err = io.ErrUnexpectedEOF
   698  			}
   699  			return r.err
   700  		}
   701  		chunkType := r.buf[0]
   702  		if !r.readHeader {
   703  			if chunkType != chunkTypeStreamIdentifier {
   704  				r.err = ErrCorrupt
   705  				return r.err
   706  			}
   707  			r.readHeader = true
   708  		}
   709  		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
   710  
   711  		// The chunk types are specified at
   712  		// https://github.com/google/snappy/blob/master/framing_format.txt
   713  		switch chunkType {
   714  		case chunkTypeCompressedData:
   715  			r.blockStart += int64(r.j)
   716  			// Section 4.2. Compressed data (chunk type 0x00).
   717  			if chunkLen < checksumSize {
   718  				r.err = ErrCorrupt
   719  				return r.err
   720  			}
   721  			if !r.ensureBufferSize(chunkLen) {
   722  				if r.err == nil {
   723  					r.err = ErrUnsupported
   724  				}
   725  				return r.err
   726  			}
   727  			buf := r.buf[:chunkLen]
   728  			if !r.readFull(buf, false) {
   729  				return r.err
   730  			}
   731  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
   732  			buf = buf[checksumSize:]
   733  
   734  			dLen, err := DecodedLen(buf)
   735  			if err != nil {
   736  				r.err = err
   737  				return r.err
   738  			}
   739  			if dLen > r.maxBlock {
   740  				r.err = ErrCorrupt
   741  				return r.err
   742  			}
   743  			// Check if destination is within this block
   744  			if int64(dLen) > n {
   745  				if len(r.decoded) < dLen {
   746  					r.decoded = make([]byte, dLen)
   747  				}
   748  				if _, err := Decode(r.decoded, buf); err != nil {
   749  					r.err = err
   750  					return r.err
   751  				}
   752  				if crc(r.decoded[:dLen]) != checksum {
   753  					r.err = ErrCorrupt
   754  					return r.err
   755  				}
   756  			} else {
   757  				// Skip block completely
   758  				n -= int64(dLen)
   759  				r.blockStart += int64(dLen)
   760  				dLen = 0
   761  			}
   762  			r.i, r.j = 0, dLen
   763  			continue
   764  		case chunkTypeUncompressedData:
   765  			r.blockStart += int64(r.j)
   766  			// Section 4.3. Uncompressed data (chunk type 0x01).
   767  			if chunkLen < checksumSize {
   768  				r.err = ErrCorrupt
   769  				return r.err
   770  			}
   771  			if !r.ensureBufferSize(chunkLen) {
   772  				if r.err != nil {
   773  					r.err = ErrUnsupported
   774  				}
   775  				return r.err
   776  			}
   777  			buf := r.buf[:checksumSize]
   778  			if !r.readFull(buf, false) {
   779  				return r.err
   780  			}
   781  			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
   782  			// Read directly into r.decoded instead of via r.buf.
   783  			n2 := chunkLen - checksumSize
   784  			if n2 > len(r.decoded) {
   785  				if n2 > r.maxBlock {
   786  					r.err = ErrCorrupt
   787  					return r.err
   788  				}
   789  				r.decoded = make([]byte, n2)
   790  			}
   791  			if !r.readFull(r.decoded[:n2], false) {
   792  				return r.err
   793  			}
   794  			if int64(n2) < n {
   795  				if crc(r.decoded[:n2]) != checksum {
   796  					r.err = ErrCorrupt
   797  					return r.err
   798  				}
   799  			}
   800  			r.i, r.j = 0, n2
   801  			continue
   802  		case chunkTypeStreamIdentifier:
   803  			// Section 4.1. Stream identifier (chunk type 0xff).
   804  			if chunkLen != len(magicBody) {
   805  				r.err = ErrCorrupt
   806  				return r.err
   807  			}
   808  			if !r.readFull(r.buf[:len(magicBody)], false) {
   809  				return r.err
   810  			}
   811  			if string(r.buf[:len(magicBody)]) != magicBody {
   812  				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
   813  					r.err = ErrCorrupt
   814  					return r.err
   815  				}
   816  			}
   817  
   818  			continue
   819  		}
   820  
   821  		if chunkType <= 0x7f {
   822  			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
   823  			r.err = ErrUnsupported
   824  			return r.err
   825  		}
   826  		if chunkLen > maxChunkSize {
   827  			r.err = ErrUnsupported
   828  			return r.err
   829  		}
   830  		// Section 4.4 Padding (chunk type 0xfe).
   831  		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
   832  		if !r.skippable(r.buf, chunkLen, false, chunkType) {
   833  			return r.err
   834  		}
   835  	}
   836  	return nil
   837  }
   838  
   839  // ReadSeeker provides random or forward seeking in compressed content.
   840  // See Reader.ReadSeeker
   841  type ReadSeeker struct {
   842  	*Reader
   843  	readAtMu sync.Mutex
   844  }
   845  
   846  // ReadSeeker will return an io.ReadSeeker and io.ReaderAt
   847  // compatible version of the reader.
   848  // If 'random' is specified the returned io.Seeker can be used for
   849  // random seeking, otherwise only forward seeking is supported.
   850  // Enabling random seeking requires the original input to support
   851  // the io.Seeker interface.
   852  // A custom index can be specified which will be used if supplied.
   853  // When using a custom index, it will not be read from the input stream.
   854  // The ReadAt position will affect regular reads and the current position of Seek.
   855  // So using Read after ReadAt will continue from where the ReadAt stopped.
   856  // No functions should be used concurrently.
   857  // The returned ReadSeeker contains a shallow reference to the existing Reader,
   858  // meaning changes performed to one is reflected in the other.
   859  func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
   860  	// Read index if provided.
   861  	if len(index) != 0 {
   862  		if r.index == nil {
   863  			r.index = &Index{}
   864  		}
   865  		if _, err := r.index.Load(index); err != nil {
   866  			return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
   867  		}
   868  	}
   869  
   870  	// Check if input is seekable
   871  	rs, ok := r.r.(io.ReadSeeker)
   872  	if !ok {
   873  		if !random {
   874  			return &ReadSeeker{Reader: r}, nil
   875  		}
   876  		return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
   877  	}
   878  
   879  	if r.index != nil {
   880  		// Seekable and index, ok...
   881  		return &ReadSeeker{Reader: r}, nil
   882  	}
   883  
   884  	// Load from stream.
   885  	r.index = &Index{}
   886  
   887  	// Read current position.
   888  	pos, err := rs.Seek(0, io.SeekCurrent)
   889  	if err != nil {
   890  		return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
   891  	}
   892  	err = r.index.LoadStream(rs)
   893  	if err != nil {
   894  		if err == ErrUnsupported {
   895  			// If we don't require random seeking, reset input and return.
   896  			if !random {
   897  				_, err = rs.Seek(pos, io.SeekStart)
   898  				if err != nil {
   899  					return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
   900  				}
   901  				r.index = nil
   902  				return &ReadSeeker{Reader: r}, nil
   903  			}
   904  			return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
   905  		}
   906  		return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
   907  	}
   908  
   909  	// reset position.
   910  	_, err = rs.Seek(pos, io.SeekStart)
   911  	if err != nil {
   912  		return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
   913  	}
   914  	return &ReadSeeker{Reader: r}, nil
   915  }
   916  
   917  // Seek allows seeking in compressed data.
   918  func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
   919  	if r.err != nil {
   920  		if !errors.Is(r.err, io.EOF) {
   921  			return 0, r.err
   922  		}
   923  		// Reset on EOF
   924  		r.err = nil
   925  	}
   926  
   927  	// Calculate absolute offset.
   928  	absOffset := offset
   929  
   930  	switch whence {
   931  	case io.SeekStart:
   932  	case io.SeekCurrent:
   933  		absOffset = r.blockStart + int64(r.i) + offset
   934  	case io.SeekEnd:
   935  		if r.index == nil {
   936  			return 0, ErrUnsupported
   937  		}
   938  		absOffset = r.index.TotalUncompressed + offset
   939  	default:
   940  		r.err = ErrUnsupported
   941  		return 0, r.err
   942  	}
   943  
   944  	if absOffset < 0 {
   945  		return 0, errors.New("seek before start of file")
   946  	}
   947  
   948  	if !r.readHeader {
   949  		// Make sure we read the header.
   950  		_, r.err = r.Read([]byte{})
   951  		if r.err != nil {
   952  			return 0, r.err
   953  		}
   954  	}
   955  
   956  	// If we are inside current block no need to seek.
   957  	// This includes no offset changes.
   958  	if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
   959  		r.i = int(absOffset - r.blockStart)
   960  		return r.blockStart + int64(r.i), nil
   961  	}
   962  
   963  	rs, ok := r.r.(io.ReadSeeker)
   964  	if r.index == nil || !ok {
   965  		currOffset := r.blockStart + int64(r.i)
   966  		if absOffset >= currOffset {
   967  			err := r.Skip(absOffset - currOffset)
   968  			return r.blockStart + int64(r.i), err
   969  		}
   970  		return 0, ErrUnsupported
   971  	}
   972  
   973  	// We can seek and we have an index.
   974  	c, u, err := r.index.Find(absOffset)
   975  	if err != nil {
   976  		return r.blockStart + int64(r.i), err
   977  	}
   978  
   979  	// Seek to next block
   980  	_, err = rs.Seek(c, io.SeekStart)
   981  	if err != nil {
   982  		return 0, err
   983  	}
   984  
   985  	r.i = r.j                     // Remove rest of current block.
   986  	r.blockStart = u - int64(r.j) // Adjust current block start for accounting.
   987  	if u < absOffset {
   988  		// Forward inside block
   989  		return absOffset, r.Skip(absOffset - u)
   990  	}
   991  	if u > absOffset {
   992  		return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
   993  	}
   994  	return absOffset, nil
   995  }
   996  
   997  // ReadAt reads len(p) bytes into p starting at offset off in the
   998  // underlying input source. It returns the number of bytes
   999  // read (0 <= n <= len(p)) and any error encountered.
  1000  //
  1001  // When ReadAt returns n < len(p), it returns a non-nil error
  1002  // explaining why more bytes were not returned. In this respect,
  1003  // ReadAt is stricter than Read.
  1004  //
  1005  // Even if ReadAt returns n < len(p), it may use all of p as scratch
  1006  // space during the call. If some data is available but not len(p) bytes,
  1007  // ReadAt blocks until either all the data is available or an error occurs.
  1008  // In this respect ReadAt is different from Read.
  1009  //
  1010  // If the n = len(p) bytes returned by ReadAt are at the end of the
  1011  // input source, ReadAt may return either err == EOF or err == nil.
  1012  //
  1013  // If ReadAt is reading from an input source with a seek offset,
  1014  // ReadAt should not affect nor be affected by the underlying
  1015  // seek offset.
  1016  //
  1017  // Clients of ReadAt can execute parallel ReadAt calls on the
  1018  // same input source. This is however not recommended.
  1019  func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
  1020  	r.readAtMu.Lock()
  1021  	defer r.readAtMu.Unlock()
  1022  	_, err := r.Seek(offset, io.SeekStart)
  1023  	if err != nil {
  1024  		return 0, err
  1025  	}
  1026  	n := 0
  1027  	for n < len(p) {
  1028  		n2, err := r.Read(p[n:])
  1029  		if err != nil {
  1030  			// This will include io.EOF
  1031  			return n + n2, err
  1032  		}
  1033  		n += n2
  1034  	}
  1035  	return n, nil
  1036  }
  1037  
  1038  // ReadByte satisfies the io.ByteReader interface.
  1039  func (r *Reader) ReadByte() (byte, error) {
  1040  	if r.err != nil {
  1041  		return 0, r.err
  1042  	}
  1043  	if r.i < r.j {
  1044  		c := r.decoded[r.i]
  1045  		r.i++
  1046  		return c, nil
  1047  	}
  1048  	var tmp [1]byte
  1049  	for i := 0; i < 10; i++ {
  1050  		n, err := r.Read(tmp[:])
  1051  		if err != nil {
  1052  			return 0, err
  1053  		}
  1054  		if n == 1 {
  1055  			return tmp[0], nil
  1056  		}
  1057  	}
  1058  	return 0, io.ErrNoProgress
  1059  }
  1060  
  1061  // SkippableCB will register a callback for chunks with the specified ID.
  1062  // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
  1063  // For each chunk with the ID, the callback is called with the content.
  1064  // Any returned non-nil error will abort decompression.
  1065  // Only one callback per ID is supported, latest sent will be used.
  1066  // Sending a nil function will disable previous callbacks.
  1067  // You can peek the stream, triggering the callback, by doing a Read with a 0
  1068  // byte buffer.
  1069  func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
  1070  	if id < 0x80 || id >= chunkTypePadding {
  1071  		return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
  1072  	}
  1073  	r.skippableCB[id-0x80] = fn
  1074  	return nil
  1075  }
  1076  

View as plain text