...

Source file src/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go

Documentation: github.com/jackc/pgx/v5/pgconn/internal/bgreader

     1  // Package bgreader provides a io.Reader that can optionally buffer reads in the background.
     2  package bgreader
     3  
     4  import (
     5  	"io"
     6  	"sync"
     7  
     8  	"github.com/jackc/pgx/v5/internal/iobufpool"
     9  )
    10  
    11  const (
    12  	StatusStopped = iota
    13  	StatusRunning
    14  	StatusStopping
    15  )
    16  
    17  // BGReader is an io.Reader that can optionally buffer reads in the background. It is safe for concurrent use.
    18  type BGReader struct {
    19  	r io.Reader
    20  
    21  	cond        *sync.Cond
    22  	status      int32
    23  	readResults []readResult
    24  }
    25  
    26  type readResult struct {
    27  	buf *[]byte
    28  	err error
    29  }
    30  
    31  // Start starts the backgrounder reader. If the background reader is already running this is a no-op. The background
    32  // reader will stop automatically when the underlying reader returns an error.
    33  func (r *BGReader) Start() {
    34  	r.cond.L.Lock()
    35  	defer r.cond.L.Unlock()
    36  
    37  	switch r.status {
    38  	case StatusStopped:
    39  		r.status = StatusRunning
    40  		go r.bgRead()
    41  	case StatusRunning:
    42  		// no-op
    43  	case StatusStopping:
    44  		r.status = StatusRunning
    45  	}
    46  }
    47  
    48  // Stop tells the background reader to stop after the in progress Read returns. It is safe to call Stop when the
    49  // background reader is not running.
    50  func (r *BGReader) Stop() {
    51  	r.cond.L.Lock()
    52  	defer r.cond.L.Unlock()
    53  
    54  	switch r.status {
    55  	case StatusStopped:
    56  		// no-op
    57  	case StatusRunning:
    58  		r.status = StatusStopping
    59  	case StatusStopping:
    60  		// no-op
    61  	}
    62  }
    63  
    64  // Status returns the current status of the background reader.
    65  func (r *BGReader) Status() int32 {
    66  	r.cond.L.Lock()
    67  	defer r.cond.L.Unlock()
    68  	return r.status
    69  }
    70  
    71  func (r *BGReader) bgRead() {
    72  	keepReading := true
    73  	for keepReading {
    74  		buf := iobufpool.Get(8192)
    75  		n, err := r.r.Read(*buf)
    76  		*buf = (*buf)[:n]
    77  
    78  		r.cond.L.Lock()
    79  		r.readResults = append(r.readResults, readResult{buf: buf, err: err})
    80  		if r.status == StatusStopping || err != nil {
    81  			r.status = StatusStopped
    82  			keepReading = false
    83  		}
    84  		r.cond.L.Unlock()
    85  		r.cond.Broadcast()
    86  	}
    87  }
    88  
    89  // Read implements the io.Reader interface.
    90  func (r *BGReader) Read(p []byte) (int, error) {
    91  	r.cond.L.Lock()
    92  	defer r.cond.L.Unlock()
    93  
    94  	if len(r.readResults) > 0 {
    95  		return r.readFromReadResults(p)
    96  	}
    97  
    98  	// There are no unread background read results and the background reader is stopped.
    99  	if r.status == StatusStopped {
   100  		return r.r.Read(p)
   101  	}
   102  
   103  	// Wait for results from the background reader
   104  	for len(r.readResults) == 0 {
   105  		r.cond.Wait()
   106  	}
   107  	return r.readFromReadResults(p)
   108  }
   109  
   110  // readBackgroundResults reads a result previously read by the background reader. r.cond.L must be held.
   111  func (r *BGReader) readFromReadResults(p []byte) (int, error) {
   112  	buf := r.readResults[0].buf
   113  	var err error
   114  
   115  	n := copy(p, *buf)
   116  	if n == len(*buf) {
   117  		err = r.readResults[0].err
   118  		iobufpool.Put(buf)
   119  		if len(r.readResults) == 1 {
   120  			r.readResults = nil
   121  		} else {
   122  			r.readResults = r.readResults[1:]
   123  		}
   124  	} else {
   125  		*buf = (*buf)[n:]
   126  		r.readResults[0].buf = buf
   127  	}
   128  
   129  	return n, err
   130  }
   131  
   132  func New(r io.Reader) *BGReader {
   133  	return &BGReader{
   134  		r: r,
   135  		cond: &sync.Cond{
   136  			L: &sync.Mutex{},
   137  		},
   138  	}
   139  }
   140  

View as plain text