...

Source file src/github.com/pierrec/lz4/v4/reader.go

Documentation: github.com/pierrec/lz4/v4

     1  package lz4
     2  
     3  import (
     4  	"bytes"
     5  	"io"
     6  
     7  	"github.com/pierrec/lz4/v4/internal/lz4block"
     8  	"github.com/pierrec/lz4/v4/internal/lz4errors"
     9  	"github.com/pierrec/lz4/v4/internal/lz4stream"
    10  )
    11  
    12  var readerStates = []aState{
    13  	noState:     newState,
    14  	errorState:  newState,
    15  	newState:    readState,
    16  	readState:   closedState,
    17  	closedState: newState,
    18  }
    19  
    20  // NewReader returns a new LZ4 frame decoder.
    21  func NewReader(r io.Reader) *Reader {
    22  	return newReader(r, false)
    23  }
    24  
    25  func newReader(r io.Reader, legacy bool) *Reader {
    26  	zr := &Reader{frame: lz4stream.NewFrame()}
    27  	zr.state.init(readerStates)
    28  	_ = zr.Apply(DefaultConcurrency, defaultOnBlockDone)
    29  	zr.Reset(r)
    30  	return zr
    31  }
    32  
    33  // Reader allows reading an LZ4 stream.
    34  type Reader struct {
    35  	state   _State
    36  	src     io.Reader        // source reader
    37  	num     int              // concurrency level
    38  	frame   *lz4stream.Frame // frame being read
    39  	data    []byte           // block buffer allocated in non concurrent mode
    40  	reads   chan []byte      // pending data
    41  	idx     int              // size of pending data
    42  	handler func(int)
    43  	cum     uint32
    44  	dict    []byte
    45  }
    46  
    47  func (*Reader) private() {}
    48  
    49  func (r *Reader) Apply(options ...Option) (err error) {
    50  	defer r.state.check(&err)
    51  	switch r.state.state {
    52  	case newState:
    53  	case errorState:
    54  		return r.state.err
    55  	default:
    56  		return lz4errors.ErrOptionClosedOrError
    57  	}
    58  	for _, o := range options {
    59  		if err = o(r); err != nil {
    60  			return
    61  		}
    62  	}
    63  	return
    64  }
    65  
    66  // Size returns the size of the underlying uncompressed data, if set in the stream.
    67  func (r *Reader) Size() int {
    68  	switch r.state.state {
    69  	case readState, closedState:
    70  		if r.frame.Descriptor.Flags.Size() {
    71  			return int(r.frame.Descriptor.ContentSize)
    72  		}
    73  	}
    74  	return 0
    75  }
    76  
    77  func (r *Reader) isNotConcurrent() bool {
    78  	return r.num == 1
    79  }
    80  
    81  func (r *Reader) init() error {
    82  	err := r.frame.ParseHeaders(r.src)
    83  	if err != nil {
    84  		return err
    85  	}
    86  	if !r.frame.Descriptor.Flags.BlockIndependence() {
    87  		// We can't decompress dependent blocks concurrently.
    88  		// Instead of throwing an error to the user, silently drop concurrency
    89  		r.num = 1
    90  	}
    91  	data, err := r.frame.InitR(r.src, r.num)
    92  	if err != nil {
    93  		return err
    94  	}
    95  	r.reads = data
    96  	r.idx = 0
    97  	size := r.frame.Descriptor.Flags.BlockSizeIndex()
    98  	r.data = size.Get()
    99  	r.cum = 0
   100  	return nil
   101  }
   102  
   103  func (r *Reader) Read(buf []byte) (n int, err error) {
   104  	defer r.state.check(&err)
   105  	switch r.state.state {
   106  	case readState:
   107  	case closedState, errorState:
   108  		return 0, r.state.err
   109  	case newState:
   110  		// First initialization.
   111  		if err = r.init(); r.state.next(err) {
   112  			return
   113  		}
   114  	default:
   115  		return 0, r.state.fail()
   116  	}
   117  	for len(buf) > 0 {
   118  		var bn int
   119  		if r.idx == 0 {
   120  			if r.isNotConcurrent() {
   121  				bn, err = r.read(buf)
   122  			} else {
   123  				lz4block.Put(r.data)
   124  				r.data = <-r.reads
   125  				if len(r.data) == 0 {
   126  					// No uncompressed data: something went wrong or we are done.
   127  					err = r.frame.Blocks.ErrorR()
   128  				}
   129  			}
   130  			switch err {
   131  			case nil:
   132  			case io.EOF:
   133  				if er := r.frame.CloseR(r.src); er != nil {
   134  					err = er
   135  				}
   136  				lz4block.Put(r.data)
   137  				r.data = nil
   138  				return
   139  			default:
   140  				return
   141  			}
   142  		}
   143  		if bn == 0 {
   144  			// Fill buf with buffered data.
   145  			bn = copy(buf, r.data[r.idx:])
   146  			r.idx += bn
   147  			if r.idx == len(r.data) {
   148  				// All data read, get ready for the next Read.
   149  				r.idx = 0
   150  			}
   151  		}
   152  		buf = buf[bn:]
   153  		n += bn
   154  		r.handler(bn)
   155  	}
   156  	return
   157  }
   158  
   159  // read uncompresses the next block as follow:
   160  // - if buf has enough room, the block is uncompressed into it directly
   161  //   and the lenght of used space is returned
   162  // - else, the uncompress data is stored in r.data and 0 is returned
   163  func (r *Reader) read(buf []byte) (int, error) {
   164  	block := r.frame.Blocks.Block
   165  	_, err := block.Read(r.frame, r.src, r.cum)
   166  	if err != nil {
   167  		return 0, err
   168  	}
   169  	var direct bool
   170  	dst := r.data[:cap(r.data)]
   171  	if len(buf) >= len(dst) {
   172  		// Uncompress directly into buf.
   173  		direct = true
   174  		dst = buf
   175  	}
   176  	dst, err = block.Uncompress(r.frame, dst, r.dict, true)
   177  	if err != nil {
   178  		return 0, err
   179  	}
   180  	if !r.frame.Descriptor.Flags.BlockIndependence() {
   181  		if len(r.dict)+len(dst) > 128*1024 {
   182  			preserveSize := 64*1024 - len(dst)
   183  			if preserveSize < 0 {
   184  				preserveSize = 0
   185  			}
   186  			r.dict = r.dict[len(r.dict)-preserveSize:]
   187  		}
   188  		r.dict = append(r.dict, dst...)
   189  	}
   190  	r.cum += uint32(len(dst))
   191  	if direct {
   192  		return len(dst), nil
   193  	}
   194  	r.data = dst
   195  	return 0, nil
   196  }
   197  
   198  // Reset clears the state of the Reader r such that it is equivalent to its
   199  // initial state from NewReader, but instead reading from reader.
   200  // No access to reader is performed.
   201  func (r *Reader) Reset(reader io.Reader) {
   202  	if r.data != nil {
   203  		lz4block.Put(r.data)
   204  		r.data = nil
   205  	}
   206  	r.frame.Reset(r.num)
   207  	r.state.reset()
   208  	r.src = reader
   209  	r.reads = nil
   210  }
   211  
   212  // WriteTo efficiently uncompresses the data from the Reader underlying source to w.
   213  func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
   214  	switch r.state.state {
   215  	case closedState, errorState:
   216  		return 0, r.state.err
   217  	case newState:
   218  		if err = r.init(); r.state.next(err) {
   219  			return
   220  		}
   221  	default:
   222  		return 0, r.state.fail()
   223  	}
   224  	defer r.state.nextd(&err)
   225  
   226  	var data []byte
   227  	if r.isNotConcurrent() {
   228  		size := r.frame.Descriptor.Flags.BlockSizeIndex()
   229  		data = size.Get()
   230  		defer lz4block.Put(data)
   231  	}
   232  	for {
   233  		var bn int
   234  		var dst []byte
   235  		if r.isNotConcurrent() {
   236  			bn, err = r.read(data)
   237  			dst = data[:bn]
   238  		} else {
   239  			lz4block.Put(dst)
   240  			dst = <-r.reads
   241  			bn = len(dst)
   242  			if bn == 0 {
   243  				// No uncompressed data: something went wrong or we are done.
   244  				err = r.frame.Blocks.ErrorR()
   245  			}
   246  		}
   247  		switch err {
   248  		case nil:
   249  		case io.EOF:
   250  			err = r.frame.CloseR(r.src)
   251  			return
   252  		default:
   253  			return
   254  		}
   255  		r.handler(bn)
   256  		bn, err = w.Write(dst)
   257  		n += int64(bn)
   258  		if err != nil {
   259  			return
   260  		}
   261  	}
   262  }
   263  
   264  // ValidFrameHeader returns a bool indicating if the given bytes slice matches a LZ4 header.
   265  func ValidFrameHeader(in []byte) (bool, error) {
   266  	f := lz4stream.NewFrame()
   267  	err := f.ParseHeaders(bytes.NewReader(in))
   268  	if err == nil {
   269  		return true, nil
   270  	}
   271  	if err == lz4errors.ErrInvalidFrame {
   272  		return false, nil
   273  	}
   274  	return false, err
   275  }
   276  

View as plain text