...

Source file src/github.com/klauspost/compress/s2/cmd/internal/readahead/reader.go

Documentation: github.com/klauspost/compress/s2/cmd/internal/readahead

     1  // Copyright (c) 2015 Klaus Post, released under MIT License. See LICENSE file.
     2  
     3  // Package readahead will do asynchronous read-ahead from an input io.Reader
     4  // and make the data available as an io.Reader.
     5  //
     6  // This should be fully transparent, except that once an error
     7  // has been returned from the Reader, it will not recover.
     8  //
     9  // The readahead object also fulfills the io.WriterTo interface, which
    10  // is likely to speed up copies.
    11  //
    12  // Package home: https://github.com/klauspost/readahead
    13  package readahead
    14  
    15  import (
    16  	"errors"
    17  	"fmt"
    18  	"io"
    19  )
    20  
    21  const (
    22  	// DefaultBuffers is the default number of buffers used.
    23  	DefaultBuffers = 4
    24  
    25  	// DefaultBufferSize is the default buffer size, 1 MB.
    26  	DefaultBufferSize = 1 << 20
    27  )
    28  
    29  type seekable struct {
    30  	*reader
    31  }
    32  
    33  type ReadSeekCloser interface {
    34  	io.ReadCloser
    35  	io.Seeker
    36  }
    37  
    38  type reader struct {
    39  	in      io.Reader     // Input reader
    40  	closer  io.Closer     // Optional closer
    41  	ready   chan *buffer  // Buffers ready to be handed to the reader
    42  	reuse   chan *buffer  // Buffers to reuse for input reading
    43  	exit    chan struct{} // Closes when finished
    44  	buffers int           // Number of buffers
    45  	size    int           // Size of each buffer
    46  	err     error         // If an error has occurred it is here
    47  	cur     *buffer       // Current buffer being served
    48  	exited  chan struct{} // Channel is closed been the async reader shuts down
    49  	bufs    [][]byte
    50  }
    51  
    52  // NewReader returns a reader that will asynchronously read from
    53  // the supplied reader into 4 buffers of 1MB each.
    54  //
    55  // It will start reading from the input at once, maybe even before this
    56  // function has returned.
    57  //
    58  // The input can be read from the returned reader.
    59  // When done use Close() to release the buffers.
    60  // If a reader supporting the io.Seeker is given,
    61  // the returned reader will also support it.
    62  func NewReader(rd io.Reader) io.ReadCloser {
    63  	if rd == nil {
    64  		return nil
    65  	}
    66  
    67  	ret, err := NewReaderSize(rd, DefaultBuffers, DefaultBufferSize)
    68  
    69  	// Should not be possible to trigger from other packages.
    70  	if err != nil {
    71  		panic("unexpected error:" + err.Error())
    72  	}
    73  	return ret
    74  }
    75  
    76  // NewReadCloser returns a reader that will asynchronously read from
    77  // the supplied reader into 4 buffers of 1MB each.
    78  //
    79  // It will start reading from the input at once, maybe even before this
    80  // function has returned.
    81  //
    82  // The input can be read from the returned reader.
    83  // When done use Close() to release the buffers,
    84  // which will also close the supplied closer.
    85  // If a reader supporting the io.Seeker is given,
    86  // the returned reader will also support it.
    87  func NewReadCloser(rd io.ReadCloser) io.ReadCloser {
    88  	if rd == nil {
    89  		return nil
    90  	}
    91  
    92  	ret, err := NewReadCloserSize(rd, DefaultBuffers, DefaultBufferSize)
    93  
    94  	// Should not be possible to trigger from other packages.
    95  	if err != nil {
    96  		panic("unexpected error:" + err.Error())
    97  	}
    98  	return ret
    99  }
   100  
   101  // NewReadSeeker returns a reader that will asynchronously read from
   102  // the supplied reader into 4 buffers of 1MB each.
   103  //
   104  // It will start reading from the input at once, maybe even before this
   105  // function has returned.
   106  //
   107  // The input can be read and seeked from the returned reader.
   108  // When done use Close() to release the buffers.
   109  func NewReadSeeker(rd io.ReadSeeker) ReadSeekCloser {
   110  	//Not checking for result as the input interface guarantees it's seekable
   111  	res, _ := NewReader(rd).(ReadSeekCloser)
   112  	return res
   113  }
   114  
   115  // NewReadSeekCloser returns a reader that will asynchronously read from
   116  // the supplied reader into 4 buffers of 1MB each.
   117  //
   118  // It will start reading from the input at once, maybe even before this
   119  // function has returned.
   120  //
   121  // The input can be read and seeked from the returned reader.
   122  // When done use Close() to release the buffers,
   123  // which will also close the supplied closer.
   124  func NewReadSeekCloser(rd ReadSeekCloser) ReadSeekCloser {
   125  	// Not checking for result as the input interface guarantees it's seekable
   126  	res, _ := NewReadCloser(rd).(ReadSeekCloser)
   127  	return res
   128  }
   129  
   130  // NewReaderSize returns a reader with a custom number of buffers and size.
   131  // buffers is the number of queued buffers and size is the size of each
   132  // buffer in bytes.
   133  func NewReaderSize(rd io.Reader, buffers, size int) (res io.ReadCloser, err error) {
   134  	if size <= 0 {
   135  		return nil, fmt.Errorf("buffer size too small")
   136  	}
   137  	if buffers <= 0 {
   138  		return nil, fmt.Errorf("number of buffers too small")
   139  	}
   140  	if rd == nil {
   141  		return nil, fmt.Errorf("nil input reader supplied")
   142  	}
   143  	a := &reader{}
   144  	if _, ok := rd.(io.Seeker); ok {
   145  		res = &seekable{a}
   146  	} else {
   147  		res = a
   148  	}
   149  	a.init(rd, buffers, size)
   150  	return
   151  }
   152  
   153  // NewReaderBuffer returns a reader with a custom number of buffers and size.
   154  // All buffers must be the same size.
   155  // Buffers can be reused after Close has been called.
   156  func NewReaderBuffer(rd io.Reader, buffers [][]byte) (res io.ReadCloser, err error) {
   157  	if len(buffers) == 0 {
   158  		return nil, fmt.Errorf("number of buffers too small")
   159  	}
   160  	sz := 0
   161  	for _, buf := range buffers {
   162  		if len(buf) == 0 {
   163  			return nil, fmt.Errorf("zero size buffer sent")
   164  		}
   165  		if sz == 0 {
   166  			sz = len(buf)
   167  		}
   168  		if sz != len(buf) {
   169  			return nil, fmt.Errorf("buffers should have similar size")
   170  		}
   171  	}
   172  	if rd == nil {
   173  		return nil, fmt.Errorf("nil input reader supplied")
   174  	}
   175  	a := &reader{}
   176  	if _, ok := rd.(io.Seeker); ok {
   177  		res = &seekable{a}
   178  	} else {
   179  		res = a
   180  	}
   181  	a.initBuffers(rd, buffers, sz)
   182  
   183  	return
   184  }
   185  
   186  // NewReadCloserSize returns a reader with a custom number of buffers and size.
   187  // buffers is the number of queued buffers and size is the size of each
   188  // buffer in bytes.
   189  func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (res io.ReadCloser, err error) {
   190  	if size <= 0 {
   191  		return nil, fmt.Errorf("buffer size too small")
   192  	}
   193  	if buffers <= 0 {
   194  		return nil, fmt.Errorf("number of buffers too small")
   195  	}
   196  	if rc == nil {
   197  		return nil, fmt.Errorf("nil input reader supplied")
   198  	}
   199  	a := &reader{closer: rc}
   200  	if _, ok := rc.(io.Seeker); ok {
   201  		res = &seekable{a}
   202  	} else {
   203  		res = a
   204  	}
   205  	a.init(rc, buffers, size)
   206  	return
   207  }
   208  
   209  // NewReadCloserBuffer returns a reader with a custom number of buffers and size.
   210  // All buffers must be the same size.
   211  // Buffers can be reused after Close has been called.
   212  func NewReadCloserBuffer(rc io.ReadCloser, buffers [][]byte) (res io.ReadCloser, err error) {
   213  	if len(buffers) == 0 {
   214  		return nil, fmt.Errorf("number of buffers too small")
   215  	}
   216  	sz := 0
   217  	for _, buf := range buffers {
   218  		if len(buf) == 0 {
   219  			return nil, fmt.Errorf("zero size buffer sent")
   220  		}
   221  		if sz == 0 {
   222  			sz = len(buf)
   223  		}
   224  		if sz != len(buf) {
   225  			return nil, fmt.Errorf("buffers should have similar size")
   226  		}
   227  	}
   228  
   229  	if rc == nil {
   230  		return nil, fmt.Errorf("nil input reader supplied")
   231  	}
   232  	a := &reader{closer: rc}
   233  	if _, ok := rc.(io.Seeker); ok {
   234  		res = &seekable{a}
   235  	} else {
   236  		res = a
   237  	}
   238  	a.initBuffers(rc, buffers, sz)
   239  	return
   240  }
   241  
   242  // NewReadSeekerSize returns a reader with a custom number of buffers and size.
   243  // buffers is the number of queued buffers and size is the size of each
   244  // buffer in bytes.
   245  func NewReadSeekerSize(rd io.ReadSeeker, buffers, size int) (res ReadSeekCloser, err error) {
   246  	reader, err := NewReaderSize(rd, buffers, size)
   247  	if err != nil {
   248  		return nil, err
   249  	}
   250  	//Not checking for result as the input interface guarantees it's seekable
   251  	res, _ = reader.(ReadSeekCloser)
   252  	return
   253  }
   254  
   255  // NewReadSeekCloserSize returns a reader with a custom number of buffers and size.
   256  // buffers is the number of queued buffers and size is the size of each
   257  // buffer in bytes.
   258  func NewReadSeekCloserSize(rd ReadSeekCloser, buffers, size int) (res ReadSeekCloser, err error) {
   259  	reader, err := NewReadCloserSize(rd, buffers, size)
   260  	if err != nil {
   261  		return nil, err
   262  	}
   263  	//Not checking for result as the input interface guarantees it's seekable
   264  	res, _ = reader.(ReadSeekCloser)
   265  	return
   266  }
   267  
   268  // NewReadSeekCloserBuffer returns a reader with a custom number of buffers and size.
   269  // All buffers must be the same size.
   270  // Buffers can be reused after Close has been called.
   271  func NewReadSeekCloserBuffer(rd ReadSeekCloser, buffers [][]byte) (res ReadSeekCloser, err error) {
   272  	reader, err := NewReadCloserBuffer(rd, buffers)
   273  	if err != nil {
   274  		return nil, err
   275  	}
   276  	//Not checking for result as the input interface guarantees it's seekable
   277  	res, _ = reader.(ReadSeekCloser)
   278  	return
   279  }
   280  
   281  // initialize the reader
   282  func (a *reader) init(rd io.Reader, buffers, size int) {
   283  	x := make([]byte, buffers*size)
   284  	bufs := make([][]byte, buffers)
   285  	for i := range bufs {
   286  		bufs[i] = x[i*size : (i+1)*size : (i+1)*size]
   287  	}
   288  	a.initBuffers(rd, bufs, size)
   289  }
   290  
   291  // initialize the reader
   292  func (a *reader) initBuffers(rd io.Reader, buffers [][]byte, size int) {
   293  	a.in = rd
   294  	a.ready = make(chan *buffer, len(buffers))
   295  	a.reuse = make(chan *buffer, len(buffers))
   296  	a.exit = make(chan struct{})
   297  	a.exited = make(chan struct{})
   298  	a.buffers = len(buffers)
   299  	a.size = size
   300  	a.cur = nil
   301  	a.err = nil
   302  	a.bufs = buffers
   303  
   304  	// Create buffers
   305  	for _, buf := range buffers {
   306  		a.reuse <- newBuffer(buf)
   307  	}
   308  
   309  	// Start async reader
   310  	go func() {
   311  		// Ensure that when we exit this is signalled.
   312  		defer close(a.exited)
   313  		defer close(a.ready)
   314  		var atEOF bool
   315  		for {
   316  			select {
   317  			case b := <-a.reuse:
   318  				if atEOF {
   319  					// Return delay
   320  					b.err = io.EOF
   321  					b.buf = b.buf[:0]
   322  					b.offset = 0
   323  					a.ready <- b
   324  					return
   325  				}
   326  				err := b.read(a.in)
   327  				// Delay EOF if we have content.
   328  				if err == io.EOF && len(b.buf) > 0 {
   329  					atEOF = true
   330  					err = nil
   331  					b.err = nil
   332  				}
   333  				a.ready <- b
   334  				if err != nil {
   335  					return
   336  				}
   337  			case <-a.exit:
   338  				return
   339  			}
   340  		}
   341  	}()
   342  }
   343  
   344  // fill will check if the current buffer is empty and fill it if it is.
   345  // If an error was returned at the end of the current buffer it is returned.
   346  func (a *reader) fill() (err error) {
   347  	if a.cur.isEmpty() {
   348  		if a.cur != nil {
   349  			a.reuse <- a.cur
   350  			a.cur = nil
   351  		}
   352  		b, ok := <-a.ready
   353  		if !ok {
   354  			if a.err == nil {
   355  				a.err = errors.New("readahead: read after Close")
   356  			}
   357  			return a.err
   358  		}
   359  		a.cur = b
   360  	}
   361  	return nil
   362  }
   363  
   364  // Read will return the next available data.
   365  func (a *reader) Read(p []byte) (n int, err error) {
   366  	if a.err != nil {
   367  		return 0, a.err
   368  	}
   369  	// Swap buffer and maybe return error
   370  	err = a.fill()
   371  	if err != nil {
   372  		return 0, err
   373  	}
   374  
   375  	// Copy what we can
   376  	n = copy(p, a.cur.buffer())
   377  	a.cur.inc(n)
   378  
   379  	if a.cur.isEmpty() {
   380  		// Return current, so a fetch can start.
   381  		if a.cur != nil {
   382  			// If at end of buffer, return any error, if present
   383  			a.err = a.cur.err
   384  			a.reuse <- a.cur
   385  			a.cur = nil
   386  		}
   387  		return n, a.err
   388  	}
   389  	return n, nil
   390  }
   391  
   392  func (a *seekable) Seek(offset int64, whence int) (res int64, err error) {
   393  	//Not checking the result as seekable receiver guarantees it to be assertable
   394  	seeker, _ := a.in.(io.Seeker)
   395  	//Make sure the async routine is closed
   396  	select {
   397  	case <-a.exited:
   398  	case a.exit <- struct{}{}:
   399  		<-a.exited
   400  	}
   401  	if whence == io.SeekCurrent {
   402  		//If need to seek based on current position, take into consideration the bytes we read but the consumer
   403  		//doesn't know about
   404  		err = nil
   405  		for a.cur != nil {
   406  			if err = a.fill(); err == nil && a.cur != nil {
   407  				offset -= int64(len(a.cur.buffer()))
   408  				a.cur.offset = len(a.cur.buf)
   409  			}
   410  		}
   411  	}
   412  	//Seek the actual Seeker
   413  	if res, err = seeker.Seek(offset, whence); err == nil {
   414  		//If the seek was successful, reinitalize ourselves (with the new position).
   415  		a.initBuffers(a.in, a.bufs, a.size)
   416  	}
   417  	return
   418  }
   419  
   420  // WriteTo writes data to w until there's no more data to write or when an error occurs.
   421  // The return value n is the number of bytes written.
   422  // Any error encountered during the write is also returned.
   423  func (a *reader) WriteTo(w io.Writer) (n int64, err error) {
   424  	if a.err != nil {
   425  		return 0, a.err
   426  	}
   427  	n = 0
   428  	for {
   429  		err = a.fill()
   430  		if err != nil {
   431  			return n, err
   432  		}
   433  		n2, err := w.Write(a.cur.buffer())
   434  		a.cur.inc(n2)
   435  		n += int64(n2)
   436  		if err != nil {
   437  			return n, err
   438  		}
   439  		if a.cur.err != nil {
   440  			// io.Writer should return nil if we are at EOF.
   441  			if a.cur.err == io.EOF {
   442  				a.err = a.cur.err
   443  				return n, nil
   444  			}
   445  			a.err = a.cur.err
   446  			return n, a.cur.err
   447  		}
   448  	}
   449  }
   450  
   451  // Close will ensure that the underlying async reader is shut down.
   452  // It will also close the input supplied on newAsyncReader.
   453  func (a *reader) Close() (err error) {
   454  	select {
   455  	case <-a.exited:
   456  	case a.exit <- struct{}{}:
   457  		<-a.exited
   458  	}
   459  	if a.closer != nil {
   460  		// Only call once
   461  		c := a.closer
   462  		a.closer = nil
   463  		return c.Close()
   464  	}
   465  	a.err = errors.New("readahead: read after Close")
   466  	return nil
   467  }
   468  
   469  // Internal buffer representing a single read.
   470  // If an error is present, it must be returned
   471  // once all buffer content has been served.
   472  type buffer struct {
   473  	buf    []byte
   474  	err    error
   475  	offset int
   476  	size   int
   477  }
   478  
   479  func newBuffer(buf []byte) *buffer {
   480  	return &buffer{buf: buf, err: nil, size: len(buf)}
   481  }
   482  
   483  // isEmpty returns true is offset is at end of
   484  // buffer, or if the buffer is nil
   485  func (b *buffer) isEmpty() bool {
   486  	if b == nil {
   487  		return true
   488  	}
   489  	if len(b.buf)-b.offset <= 0 {
   490  		return true
   491  	}
   492  	return false
   493  }
   494  
   495  // read into start of the buffer from the supplied reader,
   496  // resets the offset and updates the size of the buffer.
   497  // Any error encountered during the read is returned.
   498  func (b *buffer) read(rd io.Reader) (err error) {
   499  	defer func() {
   500  		if r := recover(); r != nil {
   501  			err = fmt.Errorf("panic reading: %v", r)
   502  			b.err = err
   503  		}
   504  	}()
   505  
   506  	var n int
   507  	buf := b.buf[0:b.size]
   508  	for n < b.size {
   509  		n2, err := rd.Read(buf)
   510  		n += n2
   511  		if err != nil {
   512  			b.err = err
   513  			break
   514  		}
   515  		buf = buf[n2:]
   516  	}
   517  	b.buf = b.buf[0:n]
   518  	b.offset = 0
   519  	return b.err
   520  }
   521  
   522  // Return the buffer at current offset
   523  func (b *buffer) buffer() []byte {
   524  	return b.buf[b.offset:]
   525  }
   526  
   527  // inc will increment the read offset
   528  func (b *buffer) inc(n int) {
   529  	b.offset += n
   530  }
   531  

View as plain text