...

Source file src/github.com/jackc/pgx/v5/pgproto3/chunkreader.go

Documentation: github.com/jackc/pgx/v5/pgproto3

     1  package pgproto3
     2  
     3  import (
     4  	"io"
     5  
     6  	"github.com/jackc/pgx/v5/internal/iobufpool"
     7  )
     8  
     9  // chunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and
    10  // will read as much as will fit in the current buffer in a single call regardless of how large a read is actually
    11  // requested. The memory returned via Next is only valid until the next call to Next.
    12  //
    13  // This is roughly equivalent to a bufio.Reader that only uses Peek and Discard to never copy bytes.
    14  type chunkReader struct {
    15  	r io.Reader
    16  
    17  	buf    *[]byte
    18  	rp, wp int // buf read position and write position
    19  
    20  	minBufSize int
    21  }
    22  
    23  // newChunkReader creates and returns a new chunkReader for r with default configuration. If minBufSize is <= 0 it uses
    24  // a default value.
    25  func newChunkReader(r io.Reader, minBufSize int) *chunkReader {
    26  	if minBufSize <= 0 {
    27  		// By historical reasons Postgres currently has 8KB send buffer inside,
    28  		// so here we want to have at least the same size buffer.
    29  		// @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134
    30  		// @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru
    31  		//
    32  		// In addition, testing has found no benefit of any larger buffer.
    33  		minBufSize = 8192
    34  	}
    35  
    36  	return &chunkReader{
    37  		r:          r,
    38  		minBufSize: minBufSize,
    39  		buf:        iobufpool.Get(minBufSize),
    40  	}
    41  }
    42  
    43  // Next returns buf filled with the next n bytes. buf is only valid until next call of Next. If an error occurs, buf
    44  // will be nil.
    45  func (r *chunkReader) Next(n int) (buf []byte, err error) {
    46  	// Reset the buffer if it is empty
    47  	if r.rp == r.wp {
    48  		if len(*r.buf) != r.minBufSize {
    49  			iobufpool.Put(r.buf)
    50  			r.buf = iobufpool.Get(r.minBufSize)
    51  		}
    52  		r.rp = 0
    53  		r.wp = 0
    54  	}
    55  
    56  	// n bytes already in buf
    57  	if (r.wp - r.rp) >= n {
    58  		buf = (*r.buf)[r.rp : r.rp+n : r.rp+n]
    59  		r.rp += n
    60  		return buf, err
    61  	}
    62  
    63  	// buf is smaller than requested number of bytes
    64  	if len(*r.buf) < n {
    65  		bigBuf := iobufpool.Get(n)
    66  		r.wp = copy((*bigBuf), (*r.buf)[r.rp:r.wp])
    67  		r.rp = 0
    68  		iobufpool.Put(r.buf)
    69  		r.buf = bigBuf
    70  	}
    71  
    72  	// buf is large enough, but need to shift filled area to start to make enough contiguous space
    73  	minReadCount := n - (r.wp - r.rp)
    74  	if (len(*r.buf) - r.wp) < minReadCount {
    75  		r.wp = copy((*r.buf), (*r.buf)[r.rp:r.wp])
    76  		r.rp = 0
    77  	}
    78  
    79  	// Read at least the required number of bytes from the underlying io.Reader
    80  	readBytesCount, err := io.ReadAtLeast(r.r, (*r.buf)[r.wp:], minReadCount)
    81  	r.wp += readBytesCount
    82  	// fmt.Println("read", n)
    83  	if err != nil {
    84  		return nil, err
    85  	}
    86  
    87  	buf = (*r.buf)[r.rp : r.rp+n : r.rp+n]
    88  	r.rp += n
    89  	return buf, nil
    90  }
    91  

View as plain text