...

Source file src/github.com/jackc/pgx/v4/pgxpool/pool.go

Documentation: github.com/jackc/pgx/v4/pgxpool

     1  package pgxpool
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"math/rand"
     7  	"runtime"
     8  	"strconv"
     9  	"sync"
    10  	"sync/atomic"
    11  	"time"
    12  
    13  	"github.com/jackc/pgconn"
    14  	"github.com/jackc/pgx/v4"
    15  	"github.com/jackc/puddle"
    16  )
    17  
    18  var defaultMaxConns = int32(4)
    19  var defaultMinConns = int32(0)
    20  var defaultMaxConnLifetime = time.Hour
    21  var defaultMaxConnIdleTime = time.Minute * 30
    22  var defaultHealthCheckPeriod = time.Minute
    23  
    24  type connResource struct {
    25  	conn      *pgx.Conn
    26  	conns     []Conn
    27  	poolRows  []poolRow
    28  	poolRowss []poolRows
    29  }
    30  
    31  func (cr *connResource) getConn(p *Pool, res *puddle.Resource) *Conn {
    32  	if len(cr.conns) == 0 {
    33  		cr.conns = make([]Conn, 128)
    34  	}
    35  
    36  	c := &cr.conns[len(cr.conns)-1]
    37  	cr.conns = cr.conns[0 : len(cr.conns)-1]
    38  
    39  	c.res = res
    40  	c.p = p
    41  
    42  	return c
    43  }
    44  
    45  func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
    46  	if len(cr.poolRows) == 0 {
    47  		cr.poolRows = make([]poolRow, 128)
    48  	}
    49  
    50  	pr := &cr.poolRows[len(cr.poolRows)-1]
    51  	cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
    52  
    53  	pr.c = c
    54  	pr.r = r
    55  
    56  	return pr
    57  }
    58  
    59  func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
    60  	if len(cr.poolRowss) == 0 {
    61  		cr.poolRowss = make([]poolRows, 128)
    62  	}
    63  
    64  	pr := &cr.poolRowss[len(cr.poolRowss)-1]
    65  	cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
    66  
    67  	pr.c = c
    68  	pr.r = r
    69  
    70  	return pr
    71  }
    72  
    73  // detachedCtx wraps a context and will never be canceled, regardless of if
    74  // the wrapped one is cancelled. The Err() method will never return any errors.
    75  type detachedCtx struct {
    76  	context.Context
    77  }
    78  
    79  func (detachedCtx) Done() <-chan struct{}       { return nil }
    80  func (detachedCtx) Deadline() (time.Time, bool) { return time.Time{}, false }
    81  func (detachedCtx) Err() error                  { return nil }
    82  
    83  // Pool allows for connection reuse.
    84  type Pool struct {
    85  	// 64 bit fields accessed with atomics must be at beginning of struct to guarantee alignment for certain 32-bit
    86  	// architectures. See BUGS section of https://pkg.go.dev/sync/atomic and https://github.com/jackc/pgx/issues/1288.
    87  	newConnsCount        int64
    88  	lifetimeDestroyCount int64
    89  	idleDestroyCount     int64
    90  
    91  	p                     *puddle.Pool
    92  	config                *Config
    93  	beforeConnect         func(context.Context, *pgx.ConnConfig) error
    94  	afterConnect          func(context.Context, *pgx.Conn) error
    95  	beforeAcquire         func(context.Context, *pgx.Conn) bool
    96  	afterRelease          func(*pgx.Conn) bool
    97  	minConns              int32
    98  	maxConns              int32
    99  	maxConnLifetime       time.Duration
   100  	maxConnLifetimeJitter time.Duration
   101  	maxConnIdleTime       time.Duration
   102  	healthCheckPeriod     time.Duration
   103  	healthCheckChan       chan struct{}
   104  
   105  	closeOnce sync.Once
   106  	closeChan chan struct{}
   107  }
   108  
   109  // Config is the configuration struct for creating a pool. It must be created by ParseConfig and then it can be
   110  // modified. A manually initialized ConnConfig will cause ConnectConfig to panic.
   111  type Config struct {
   112  	ConnConfig *pgx.ConnConfig
   113  
   114  	// BeforeConnect is called before a new connection is made. It is passed a copy of the underlying pgx.ConnConfig and
   115  	// will not impact any existing open connections.
   116  	BeforeConnect func(context.Context, *pgx.ConnConfig) error
   117  
   118  	// AfterConnect is called after a connection is established, but before it is added to the pool.
   119  	AfterConnect func(context.Context, *pgx.Conn) error
   120  
   121  	// BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the
   122  	// acquision or false to indicate that the connection should be destroyed and a different connection should be
   123  	// acquired.
   124  	BeforeAcquire func(context.Context, *pgx.Conn) bool
   125  
   126  	// AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to
   127  	// return the connection to the pool or false to destroy the connection.
   128  	AfterRelease func(*pgx.Conn) bool
   129  
   130  	// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
   131  	MaxConnLifetime time.Duration
   132  
   133  	// MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
   134  	// This helps prevent all connections from being closed at the exact same time, starving the pool.
   135  	MaxConnLifetimeJitter time.Duration
   136  
   137  	// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
   138  	MaxConnIdleTime time.Duration
   139  
   140  	// MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
   141  	MaxConns int32
   142  
   143  	// MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
   144  	// number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
   145  	// to create new connections.
   146  	MinConns int32
   147  
   148  	// HealthCheckPeriod is the duration between checks of the health of idle connections.
   149  	HealthCheckPeriod time.Duration
   150  
   151  	// If set to true, pool doesn't do any I/O operation on initialization.
   152  	// And connects to the server only when the pool starts to be used.
   153  	// The default is false.
   154  	LazyConnect bool
   155  
   156  	createdByParseConfig bool // Used to enforce created by ParseConfig rule.
   157  }
   158  
   159  // Copy returns a deep copy of the config that is safe to use and modify.
   160  // The only exception is the tls.Config:
   161  // according to the tls.Config docs it must not be modified after creation.
   162  func (c *Config) Copy() *Config {
   163  	newConfig := new(Config)
   164  	*newConfig = *c
   165  	newConfig.ConnConfig = c.ConnConfig.Copy()
   166  	return newConfig
   167  }
   168  
   169  // ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config.
   170  func (c *Config) ConnString() string { return c.ConnConfig.ConnString() }
   171  
   172  // Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
   173  // connection. See ParseConfig for information on connString format.
   174  func Connect(ctx context.Context, connString string) (*Pool, error) {
   175  	config, err := ParseConfig(connString)
   176  	if err != nil {
   177  		return nil, err
   178  	}
   179  
   180  	return ConnectConfig(ctx, config)
   181  }
   182  
   183  // ConnectConfig creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
   184  // connection. config must have been created by ParseConfig.
   185  func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
   186  	// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
   187  	// zero values.
   188  	if !config.createdByParseConfig {
   189  		panic("config must be created by ParseConfig")
   190  	}
   191  
   192  	p := &Pool{
   193  		config:                config,
   194  		beforeConnect:         config.BeforeConnect,
   195  		afterConnect:          config.AfterConnect,
   196  		beforeAcquire:         config.BeforeAcquire,
   197  		afterRelease:          config.AfterRelease,
   198  		minConns:              config.MinConns,
   199  		maxConns:              config.MaxConns,
   200  		maxConnLifetime:       config.MaxConnLifetime,
   201  		maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
   202  		maxConnIdleTime:       config.MaxConnIdleTime,
   203  		healthCheckPeriod:     config.HealthCheckPeriod,
   204  		healthCheckChan:       make(chan struct{}, 1),
   205  		closeChan:             make(chan struct{}),
   206  	}
   207  
   208  	p.p = puddle.NewPool(
   209  		func(ctx context.Context) (interface{}, error) {
   210  			// we ignore cancellation on the original context because its either from
   211  			// the health check or its from a query and we don't want to cancel creating
   212  			// a connection just because the original query was cancelled since that
   213  			// could end up stampeding the server
   214  			// this will keep any Values in the original context and will just ignore
   215  			// cancellation
   216  			// see https://github.com/jackc/pgx/issues/1259
   217  			ctx = detachedCtx{ctx}
   218  
   219  			connConfig := p.config.ConnConfig.Copy()
   220  
   221  			// But we do want to ensure that a connect won't hang forever.
   222  			if connConfig.ConnectTimeout <= 0 {
   223  				connConfig.ConnectTimeout = 2 * time.Minute
   224  			}
   225  
   226  			if p.beforeConnect != nil {
   227  				if err := p.beforeConnect(ctx, connConfig); err != nil {
   228  					return nil, err
   229  				}
   230  			}
   231  
   232  			conn, err := pgx.ConnectConfig(ctx, connConfig)
   233  			if err != nil {
   234  				return nil, err
   235  			}
   236  
   237  			if p.afterConnect != nil {
   238  				err = p.afterConnect(ctx, conn)
   239  				if err != nil {
   240  					conn.Close(ctx)
   241  					return nil, err
   242  				}
   243  			}
   244  
   245  			cr := &connResource{
   246  				conn:      conn,
   247  				conns:     make([]Conn, 64),
   248  				poolRows:  make([]poolRow, 64),
   249  				poolRowss: make([]poolRows, 64),
   250  			}
   251  
   252  			return cr, nil
   253  		},
   254  		func(value interface{}) {
   255  			ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
   256  			conn := value.(*connResource).conn
   257  			conn.Close(ctx)
   258  			select {
   259  			case <-conn.PgConn().CleanupDone():
   260  			case <-ctx.Done():
   261  			}
   262  			cancel()
   263  		},
   264  		config.MaxConns,
   265  	)
   266  
   267  	if !config.LazyConnect {
   268  		if err := p.checkMinConnsWithContext(ctx); err != nil {
   269  			// Couldn't create resources for minpool size. Close unhealthy pool.
   270  			p.Close()
   271  			return nil, err
   272  		}
   273  
   274  		// Initially establish one connection
   275  		res, err := p.p.Acquire(ctx)
   276  		if err != nil {
   277  			p.Close()
   278  			return nil, err
   279  		}
   280  		res.Release()
   281  	}
   282  
   283  	go p.backgroundHealthCheck()
   284  
   285  	return p, nil
   286  }
   287  
   288  // ParseConfig builds a Config from connString. It parses connString with the same behavior as pgx.ParseConfig with the
   289  // addition of the following variables:
   290  //
   291  // pool_max_conns: integer greater than 0
   292  // pool_min_conns: integer 0 or greater
   293  // pool_max_conn_lifetime: duration string
   294  // pool_max_conn_idle_time: duration string
   295  // pool_health_check_period: duration string
   296  // pool_max_conn_lifetime_jitter: duration string
   297  //
   298  // See Config for definitions of these arguments.
   299  //
   300  //	# Example DSN
   301  //	user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10
   302  //
   303  //	# Example URL
   304  //	postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10
   305  func ParseConfig(connString string) (*Config, error) {
   306  	connConfig, err := pgx.ParseConfig(connString)
   307  	if err != nil {
   308  		return nil, err
   309  	}
   310  
   311  	config := &Config{
   312  		ConnConfig:           connConfig,
   313  		createdByParseConfig: true,
   314  	}
   315  
   316  	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
   317  		delete(connConfig.Config.RuntimeParams, "pool_max_conns")
   318  		n, err := strconv.ParseInt(s, 10, 32)
   319  		if err != nil {
   320  			return nil, fmt.Errorf("cannot parse pool_max_conns: %w", err)
   321  		}
   322  		if n < 1 {
   323  			return nil, fmt.Errorf("pool_max_conns too small: %d", n)
   324  		}
   325  		config.MaxConns = int32(n)
   326  	} else {
   327  		config.MaxConns = defaultMaxConns
   328  		if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns {
   329  			config.MaxConns = numCPU
   330  		}
   331  	}
   332  
   333  	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok {
   334  		delete(connConfig.Config.RuntimeParams, "pool_min_conns")
   335  		n, err := strconv.ParseInt(s, 10, 32)
   336  		if err != nil {
   337  			return nil, fmt.Errorf("cannot parse pool_min_conns: %w", err)
   338  		}
   339  		config.MinConns = int32(n)
   340  	} else {
   341  		config.MinConns = defaultMinConns
   342  	}
   343  
   344  	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
   345  		delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
   346  		d, err := time.ParseDuration(s)
   347  		if err != nil {
   348  			return nil, fmt.Errorf("invalid pool_max_conn_lifetime: %w", err)
   349  		}
   350  		config.MaxConnLifetime = d
   351  	} else {
   352  		config.MaxConnLifetime = defaultMaxConnLifetime
   353  	}
   354  
   355  	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_idle_time"]; ok {
   356  		delete(connConfig.Config.RuntimeParams, "pool_max_conn_idle_time")
   357  		d, err := time.ParseDuration(s)
   358  		if err != nil {
   359  			return nil, fmt.Errorf("invalid pool_max_conn_idle_time: %w", err)
   360  		}
   361  		config.MaxConnIdleTime = d
   362  	} else {
   363  		config.MaxConnIdleTime = defaultMaxConnIdleTime
   364  	}
   365  
   366  	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok {
   367  		delete(connConfig.Config.RuntimeParams, "pool_health_check_period")
   368  		d, err := time.ParseDuration(s)
   369  		if err != nil {
   370  			return nil, fmt.Errorf("invalid pool_health_check_period: %w", err)
   371  		}
   372  		config.HealthCheckPeriod = d
   373  	} else {
   374  		config.HealthCheckPeriod = defaultHealthCheckPeriod
   375  	}
   376  
   377  	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
   378  		delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
   379  		d, err := time.ParseDuration(s)
   380  		if err != nil {
   381  			return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
   382  		}
   383  		config.MaxConnLifetimeJitter = d
   384  	}
   385  
   386  	return config, nil
   387  }
   388  
   389  // Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
   390  // to pool and closed.
   391  func (p *Pool) Close() {
   392  	p.closeOnce.Do(func() {
   393  		close(p.closeChan)
   394  		p.p.Close()
   395  	})
   396  }
   397  
   398  func (p *Pool) isExpired(res *puddle.Resource) bool {
   399  	now := time.Now()
   400  	// Small optimization to avoid rand. If it's over lifetime AND jitter, immediately
   401  	// return true.
   402  	if now.Sub(res.CreationTime()) > p.maxConnLifetime+p.maxConnLifetimeJitter {
   403  		return true
   404  	}
   405  	if p.maxConnLifetimeJitter == 0 {
   406  		return false
   407  	}
   408  	jitterSecs := rand.Float64() * p.maxConnLifetimeJitter.Seconds()
   409  	return now.Sub(res.CreationTime()) > p.maxConnLifetime+(time.Duration(jitterSecs)*time.Second)
   410  }
   411  
   412  func (p *Pool) triggerHealthCheck() {
   413  	go func() {
   414  		// Destroy is asynchronous so we give it time to actually remove itself from
   415  		// the pool otherwise we might try to check the pool size too soon
   416  		time.Sleep(500 * time.Millisecond)
   417  		select {
   418  		case p.healthCheckChan <- struct{}{}:
   419  		default:
   420  		}
   421  	}()
   422  }
   423  
   424  func (p *Pool) backgroundHealthCheck() {
   425  	ticker := time.NewTicker(p.healthCheckPeriod)
   426  	defer ticker.Stop()
   427  	for {
   428  		select {
   429  		case <-p.closeChan:
   430  			return
   431  		case <-p.healthCheckChan:
   432  			p.checkHealth()
   433  		case <-ticker.C:
   434  			p.checkHealth()
   435  		}
   436  	}
   437  }
   438  
   439  func (p *Pool) checkHealth() {
   440  	for {
   441  		// If checkMinConns failed we don't destroy any connections since we couldn't
   442  		// even get to minConns
   443  		if err := p.checkMinConns(); err != nil {
   444  			// Should we log this error somewhere?
   445  			break
   446  		}
   447  		if !p.checkConnsHealth() {
   448  			// Since we didn't destroy any connections we can stop looping
   449  			break
   450  		}
   451  		// Technically Destroy is asynchronous but 500ms should be enough for it to
   452  		// remove it from the underlying pool
   453  		select {
   454  		case <-p.closeChan:
   455  			return
   456  		case <-time.After(500 * time.Millisecond):
   457  		}
   458  	}
   459  }
   460  
   461  // checkConnsHealth will check all idle connections, destroy a connection if
   462  // it's idle or too old, and returns true if any were destroyed
   463  func (p *Pool) checkConnsHealth() bool {
   464  	var destroyed bool
   465  	totalConns := p.Stat().TotalConns()
   466  	resources := p.p.AcquireAllIdle()
   467  	for _, res := range resources {
   468  		// We're okay going under minConns if the lifetime is up
   469  		if p.isExpired(res) && totalConns >= p.minConns {
   470  			atomic.AddInt64(&p.lifetimeDestroyCount, 1)
   471  			res.Destroy()
   472  			destroyed = true
   473  			// Since Destroy is async we manually decrement totalConns.
   474  			totalConns--
   475  		} else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
   476  			atomic.AddInt64(&p.idleDestroyCount, 1)
   477  			res.Destroy()
   478  			destroyed = true
   479  			// Since Destroy is async we manually decrement totalConns.
   480  			totalConns--
   481  		} else {
   482  			res.ReleaseUnused()
   483  		}
   484  	}
   485  	return destroyed
   486  }
   487  
   488  func (p *Pool) checkMinConnsWithContext(ctx context.Context) error {
   489  	// TotalConns can include ones that are being destroyed but we should have
   490  	// sleep(500ms) around all of the destroys to help prevent that from throwing
   491  	// off this check
   492  	toCreate := p.minConns - p.Stat().TotalConns()
   493  	if toCreate > 0 {
   494  		return p.createIdleResources(ctx, int(toCreate))
   495  	}
   496  	return nil
   497  }
   498  
   499  func (p *Pool) checkMinConns() error {
   500  	return p.checkMinConnsWithContext(context.Background())
   501  }
   502  
   503  func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
   504  	ctx, cancel := context.WithCancel(parentCtx)
   505  	defer cancel()
   506  
   507  	errs := make(chan error, targetResources)
   508  
   509  	for i := 0; i < targetResources; i++ {
   510  		go func() {
   511  			atomic.AddInt64(&p.newConnsCount, 1)
   512  			err := p.p.CreateResource(ctx)
   513  			errs <- err
   514  		}()
   515  	}
   516  
   517  	var firstError error
   518  	for i := 0; i < targetResources; i++ {
   519  		err := <-errs
   520  		if err != nil && firstError == nil {
   521  			cancel()
   522  			firstError = err
   523  		}
   524  	}
   525  
   526  	return firstError
   527  }
   528  
   529  // Acquire returns a connection (*Conn) from the Pool
   530  func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
   531  	for {
   532  		res, err := p.p.Acquire(ctx)
   533  		if err != nil {
   534  			return nil, err
   535  		}
   536  
   537  		cr := res.Value().(*connResource)
   538  		if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
   539  			return cr.getConn(p, res), nil
   540  		}
   541  
   542  		res.Destroy()
   543  	}
   544  }
   545  
   546  // AcquireFunc acquires a *Conn and calls f with that *Conn. ctx will only affect the Acquire. It has no effect on the
   547  // call of f. The return value is either an error acquiring the *Conn or the return value of f. The *Conn is
   548  // automatically released after the call of f.
   549  func (p *Pool) AcquireFunc(ctx context.Context, f func(*Conn) error) error {
   550  	conn, err := p.Acquire(ctx)
   551  	if err != nil {
   552  		return err
   553  	}
   554  	defer conn.Release()
   555  
   556  	return f(conn)
   557  }
   558  
   559  // AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
   560  // keep-alive functionality. It does not update pool statistics.
   561  func (p *Pool) AcquireAllIdle(ctx context.Context) []*Conn {
   562  	resources := p.p.AcquireAllIdle()
   563  	conns := make([]*Conn, 0, len(resources))
   564  	for _, res := range resources {
   565  		cr := res.Value().(*connResource)
   566  		if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
   567  			conns = append(conns, cr.getConn(p, res))
   568  		} else {
   569  			res.Destroy()
   570  		}
   571  	}
   572  
   573  	return conns
   574  }
   575  
   576  // Config returns a copy of config that was used to initialize this pool.
   577  func (p *Pool) Config() *Config { return p.config.Copy() }
   578  
   579  // Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
   580  func (p *Pool) Stat() *Stat {
   581  	return &Stat{
   582  		s:                    p.p.Stat(),
   583  		newConnsCount:        atomic.LoadInt64(&p.newConnsCount),
   584  		lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
   585  		idleDestroyCount:     atomic.LoadInt64(&p.idleDestroyCount),
   586  	}
   587  }
   588  
   589  // Exec acquires a connection from the Pool and executes the given SQL.
   590  // SQL can be either a prepared statement name or an SQL string.
   591  // Arguments should be referenced positionally from the SQL string as $1, $2, etc.
   592  // The acquired connection is returned to the pool when the Exec function returns.
   593  func (p *Pool) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
   594  	c, err := p.Acquire(ctx)
   595  	if err != nil {
   596  		return nil, err
   597  	}
   598  	defer c.Release()
   599  
   600  	return c.Exec(ctx, sql, arguments...)
   601  }
   602  
   603  // Query acquires a connection and executes a query that returns pgx.Rows.
   604  // Arguments should be referenced positionally from the SQL string as $1, $2, etc.
   605  // See pgx.Rows documentation to close the returned Rows and return the acquired connection to the Pool.
   606  //
   607  // If there is an error, the returned pgx.Rows will be returned in an error state.
   608  // If preferred, ignore the error returned from Query and handle errors using the returned pgx.Rows.
   609  //
   610  // For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
   611  // QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
   612  // needed. See the documentation for those types for details.
   613  func (p *Pool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
   614  	c, err := p.Acquire(ctx)
   615  	if err != nil {
   616  		return errRows{err: err}, err
   617  	}
   618  
   619  	rows, err := c.Query(ctx, sql, args...)
   620  	if err != nil {
   621  		c.Release()
   622  		return errRows{err: err}, err
   623  	}
   624  
   625  	return c.getPoolRows(rows), nil
   626  }
   627  
   628  // QueryRow acquires a connection and executes a query that is expected
   629  // to return at most one row (pgx.Row). Errors are deferred until pgx.Row's
   630  // Scan method is called. If the query selects no rows, pgx.Row's Scan will
   631  // return ErrNoRows. Otherwise, pgx.Row's Scan scans the first selected row
   632  // and discards the rest. The acquired connection is returned to the Pool when
   633  // pgx.Row's Scan method is called.
   634  //
   635  // Arguments should be referenced positionally from the SQL string as $1, $2, etc.
   636  //
   637  // For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
   638  // QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
   639  // needed. See the documentation for those types for details.
   640  func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
   641  	c, err := p.Acquire(ctx)
   642  	if err != nil {
   643  		return errRow{err: err}
   644  	}
   645  
   646  	row := c.QueryRow(ctx, sql, args...)
   647  	return c.getPoolRow(row)
   648  }
   649  
   650  func (p *Pool) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
   651  	c, err := p.Acquire(ctx)
   652  	if err != nil {
   653  		return nil, err
   654  	}
   655  	defer c.Release()
   656  
   657  	return c.QueryFunc(ctx, sql, args, scans, f)
   658  }
   659  
   660  func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
   661  	c, err := p.Acquire(ctx)
   662  	if err != nil {
   663  		return errBatchResults{err: err}
   664  	}
   665  
   666  	br := c.SendBatch(ctx, b)
   667  	return &poolBatchResults{br: br, c: c}
   668  }
   669  
   670  // Begin acquires a connection from the Pool and starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
   671  // auto-rollback on context cancellation. Begin initiates a transaction block without explicitly setting a transaction mode for the block (see BeginTx with TxOptions if transaction mode is required).
   672  // *pgxpool.Tx is returned, which implements the pgx.Tx interface.
   673  // Commit or Rollback must be called on the returned transaction to finalize the transaction block.
   674  func (p *Pool) Begin(ctx context.Context) (pgx.Tx, error) {
   675  	return p.BeginTx(ctx, pgx.TxOptions{})
   676  }
   677  
   678  // BeginTx acquires a connection from the Pool and starts a transaction with pgx.TxOptions determining the transaction mode.
   679  // Unlike database/sql, the context only affects the begin command. i.e. there is no auto-rollback on context cancellation.
   680  // *pgxpool.Tx is returned, which implements the pgx.Tx interface.
   681  // Commit or Rollback must be called on the returned transaction to finalize the transaction block.
   682  func (p *Pool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
   683  	c, err := p.Acquire(ctx)
   684  	if err != nil {
   685  		return nil, err
   686  	}
   687  
   688  	t, err := c.BeginTx(ctx, txOptions)
   689  	if err != nil {
   690  		c.Release()
   691  		return nil, err
   692  	}
   693  
   694  	return &Tx{t: t, c: c}, nil
   695  }
   696  
   697  func (p *Pool) BeginFunc(ctx context.Context, f func(pgx.Tx) error) error {
   698  	return p.BeginTxFunc(ctx, pgx.TxOptions{}, f)
   699  }
   700  
   701  func (p *Pool) BeginTxFunc(ctx context.Context, txOptions pgx.TxOptions, f func(pgx.Tx) error) error {
   702  	c, err := p.Acquire(ctx)
   703  	if err != nil {
   704  		return err
   705  	}
   706  	defer c.Release()
   707  
   708  	return c.BeginTxFunc(ctx, txOptions, f)
   709  }
   710  
   711  func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
   712  	c, err := p.Acquire(ctx)
   713  	if err != nil {
   714  		return 0, err
   715  	}
   716  	defer c.Release()
   717  
   718  	return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
   719  }
   720  
   721  // Ping acquires a connection from the Pool and executes an empty sql statement against it.
   722  // If the sql returns without error, the database Ping is considered successful, otherwise, the error is returned.
   723  func (p *Pool) Ping(ctx context.Context) error {
   724  	c, err := p.Acquire(ctx)
   725  	if err != nil {
   726  		return err
   727  	}
   728  	defer c.Release()
   729  	return c.Ping(ctx)
   730  }
   731  

View as plain text