...

Source file src/github.com/jackc/pgx/v5/pgxpool/pool_test.go

Documentation: github.com/jackc/pgx/v5/pgxpool

     1  package pgxpool_test
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"sync/atomic"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/jackc/pgx/v5"
    13  	"github.com/jackc/pgx/v5/pgxpool"
    14  	"github.com/jackc/pgx/v5/pgxtest"
    15  	"github.com/stretchr/testify/assert"
    16  	"github.com/stretchr/testify/require"
    17  )
    18  
    19  func TestConnect(t *testing.T) {
    20  	t.Parallel()
    21  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
    22  	defer cancel()
    23  	connString := os.Getenv("PGX_TEST_DATABASE")
    24  	pool, err := pgxpool.New(ctx, connString)
    25  	require.NoError(t, err)
    26  	assert.Equal(t, connString, pool.Config().ConnString())
    27  	pool.Close()
    28  }
    29  
    30  func TestConnectConfig(t *testing.T) {
    31  	t.Parallel()
    32  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
    33  	defer cancel()
    34  	connString := os.Getenv("PGX_TEST_DATABASE")
    35  	config, err := pgxpool.ParseConfig(connString)
    36  	require.NoError(t, err)
    37  	pool, err := pgxpool.NewWithConfig(ctx, config)
    38  	require.NoError(t, err)
    39  	assertConfigsEqual(t, config, pool.Config(), "Pool.Config() returns original config")
    40  	pool.Close()
    41  }
    42  
    43  func TestParseConfigExtractsPoolArguments(t *testing.T) {
    44  	t.Parallel()
    45  
    46  	config, err := pgxpool.ParseConfig("pool_max_conns=42 pool_min_conns=1")
    47  	assert.NoError(t, err)
    48  	assert.EqualValues(t, 42, config.MaxConns)
    49  	assert.EqualValues(t, 1, config.MinConns)
    50  	assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns")
    51  	assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns")
    52  }
    53  
    54  func TestConstructorIgnoresContext(t *testing.T) {
    55  	t.Parallel()
    56  
    57  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
    58  	assert.NoError(t, err)
    59  	var cancel func()
    60  	config.BeforeConnect = func(context.Context, *pgx.ConnConfig) error {
    61  		// cancel the query's context before we actually Dial to ensure the Dial's
    62  		// context isn't cancelled
    63  		cancel()
    64  		return nil
    65  	}
    66  
    67  	pool, err := pgxpool.NewWithConfig(context.Background(), config)
    68  	require.NoError(t, err)
    69  
    70  	assert.EqualValues(t, 0, pool.Stat().TotalConns())
    71  
    72  	var ctx context.Context
    73  	ctx, cancel = context.WithCancel(context.Background())
    74  	defer cancel()
    75  	_, err = pool.Exec(ctx, "SELECT 1")
    76  	assert.ErrorIs(t, err, context.Canceled)
    77  	assert.EqualValues(t, 1, pool.Stat().TotalConns())
    78  }
    79  
    80  func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
    81  	t.Parallel()
    82  
    83  	config := &pgxpool.Config{}
    84  
    85  	require.PanicsWithValue(t, "config must be created by ParseConfig", func() { pgxpool.NewWithConfig(context.Background(), config) })
    86  }
    87  
    88  func TestConfigCopyReturnsEqualConfig(t *testing.T) {
    89  	connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
    90  	original, err := pgxpool.ParseConfig(connString)
    91  	require.NoError(t, err)
    92  
    93  	copied := original.Copy()
    94  
    95  	assertConfigsEqual(t, original, copied, t.Name())
    96  }
    97  
    98  func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
    99  	connString := os.Getenv("PGX_TEST_DATABASE")
   100  	original, err := pgxpool.ParseConfig(connString)
   101  	require.NoError(t, err)
   102  
   103  	copied := original.Copy()
   104  	assert.NotPanics(t, func() {
   105  		_, err = pgxpool.NewWithConfig(context.Background(), copied)
   106  	})
   107  	assert.NoError(t, err)
   108  }
   109  
   110  func TestPoolAcquireAndConnRelease(t *testing.T) {
   111  	t.Parallel()
   112  
   113  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   114  	defer cancel()
   115  
   116  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   117  	require.NoError(t, err)
   118  	defer pool.Close()
   119  
   120  	c, err := pool.Acquire(ctx)
   121  	require.NoError(t, err)
   122  	c.Release()
   123  }
   124  
   125  func TestPoolAcquireAndConnHijack(t *testing.T) {
   126  	t.Parallel()
   127  
   128  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   129  	defer cancel()
   130  
   131  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   132  	require.NoError(t, err)
   133  	defer pool.Close()
   134  
   135  	c, err := pool.Acquire(ctx)
   136  	require.NoError(t, err)
   137  
   138  	connsBeforeHijack := pool.Stat().TotalConns()
   139  
   140  	conn := c.Hijack()
   141  	defer conn.Close(ctx)
   142  
   143  	connsAfterHijack := pool.Stat().TotalConns()
   144  	require.Equal(t, connsBeforeHijack-1, connsAfterHijack)
   145  
   146  	var n int32
   147  	err = conn.QueryRow(ctx, `select 1`).Scan(&n)
   148  	require.NoError(t, err)
   149  	require.Equal(t, int32(1), n)
   150  }
   151  
   152  func TestPoolAcquireChecksIdleConns(t *testing.T) {
   153  	t.Parallel()
   154  
   155  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   156  	defer cancel()
   157  
   158  	controllerConn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
   159  	require.NoError(t, err)
   160  	defer controllerConn.Close(ctx)
   161  	pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
   162  
   163  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   164  	require.NoError(t, err)
   165  	defer pool.Close()
   166  
   167  	var conns []*pgxpool.Conn
   168  	for i := 0; i < 3; i++ {
   169  		c, err := pool.Acquire(ctx)
   170  		require.NoError(t, err)
   171  		conns = append(conns, c)
   172  	}
   173  
   174  	require.EqualValues(t, 3, pool.Stat().TotalConns())
   175  
   176  	var pids []uint32
   177  	for _, c := range conns {
   178  		pids = append(pids, c.Conn().PgConn().PID())
   179  		c.Release()
   180  	}
   181  
   182  	_, err = controllerConn.Exec(ctx, `select pg_terminate_backend(n) from unnest($1::int[]) n`, pids)
   183  	require.NoError(t, err)
   184  
   185  	// All conns are dead they don't know it and neither does the pool.
   186  	require.EqualValues(t, 3, pool.Stat().TotalConns())
   187  
   188  	// Wait long enough so the pool will realize it needs to check the connections.
   189  	time.Sleep(time.Second)
   190  
   191  	// Pool should try all existing connections and find them dead, then create a new connection which should successfully ping.
   192  	err = pool.Ping(ctx)
   193  	require.NoError(t, err)
   194  
   195  	// The original 3 conns should have been terminated and the a new conn established for the ping.
   196  	require.EqualValues(t, 1, pool.Stat().TotalConns())
   197  	c, err := pool.Acquire(ctx)
   198  	require.NoError(t, err)
   199  
   200  	cPID := c.Conn().PgConn().PID()
   201  	c.Release()
   202  
   203  	require.NotContains(t, pids, cPID)
   204  }
   205  
   206  func TestPoolAcquireFunc(t *testing.T) {
   207  	t.Parallel()
   208  
   209  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   210  	defer cancel()
   211  
   212  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   213  	require.NoError(t, err)
   214  	defer pool.Close()
   215  
   216  	var n int32
   217  	err = pool.AcquireFunc(ctx, func(c *pgxpool.Conn) error {
   218  		return c.QueryRow(ctx, "select 1").Scan(&n)
   219  	})
   220  	require.NoError(t, err)
   221  	require.EqualValues(t, 1, n)
   222  }
   223  
   224  func TestPoolAcquireFuncReturnsFnError(t *testing.T) {
   225  	t.Parallel()
   226  
   227  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   228  	defer cancel()
   229  
   230  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   231  	require.NoError(t, err)
   232  	defer pool.Close()
   233  
   234  	err = pool.AcquireFunc(ctx, func(c *pgxpool.Conn) error {
   235  		return fmt.Errorf("some error")
   236  	})
   237  	require.EqualError(t, err, "some error")
   238  }
   239  
   240  func TestPoolBeforeConnect(t *testing.T) {
   241  	t.Parallel()
   242  
   243  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   244  	defer cancel()
   245  
   246  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   247  	require.NoError(t, err)
   248  
   249  	config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
   250  		cfg.Config.RuntimeParams["application_name"] = "pgx"
   251  		return nil
   252  	}
   253  
   254  	db, err := pgxpool.NewWithConfig(ctx, config)
   255  	require.NoError(t, err)
   256  	defer db.Close()
   257  
   258  	var str string
   259  	err = db.QueryRow(ctx, "SHOW application_name").Scan(&str)
   260  	require.NoError(t, err)
   261  	assert.EqualValues(t, "pgx", str)
   262  }
   263  
   264  func TestPoolAfterConnect(t *testing.T) {
   265  	t.Parallel()
   266  
   267  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   268  	defer cancel()
   269  
   270  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   271  	require.NoError(t, err)
   272  
   273  	config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
   274  		_, err := c.Prepare(ctx, "ps1", "select 1")
   275  		return err
   276  	}
   277  
   278  	db, err := pgxpool.NewWithConfig(ctx, config)
   279  	require.NoError(t, err)
   280  	defer db.Close()
   281  
   282  	var n int32
   283  	err = db.QueryRow(ctx, "ps1").Scan(&n)
   284  	require.NoError(t, err)
   285  	assert.EqualValues(t, 1, n)
   286  }
   287  
   288  func TestPoolBeforeAcquire(t *testing.T) {
   289  	t.Parallel()
   290  
   291  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   292  	defer cancel()
   293  
   294  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   295  	require.NoError(t, err)
   296  
   297  	acquireAttempts := 0
   298  
   299  	config.BeforeAcquire = func(ctx context.Context, c *pgx.Conn) bool {
   300  		acquireAttempts++
   301  		return acquireAttempts%2 == 0
   302  	}
   303  
   304  	db, err := pgxpool.NewWithConfig(ctx, config)
   305  	require.NoError(t, err)
   306  	defer db.Close()
   307  
   308  	conns := make([]*pgxpool.Conn, 4)
   309  	for i := range conns {
   310  		conns[i], err = db.Acquire(ctx)
   311  		assert.NoError(t, err)
   312  	}
   313  
   314  	for _, c := range conns {
   315  		c.Release()
   316  	}
   317  	waitForReleaseToComplete()
   318  
   319  	assert.EqualValues(t, 8, acquireAttempts)
   320  
   321  	conns = db.AcquireAllIdle(ctx)
   322  	assert.Len(t, conns, 2)
   323  
   324  	for _, c := range conns {
   325  		c.Release()
   326  	}
   327  	waitForReleaseToComplete()
   328  
   329  	assert.EqualValues(t, 12, acquireAttempts)
   330  }
   331  
   332  func TestPoolAfterRelease(t *testing.T) {
   333  	t.Parallel()
   334  
   335  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   336  	defer cancel()
   337  
   338  	func() {
   339  		pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   340  		require.NoError(t, err)
   341  		defer pool.Close()
   342  	}()
   343  
   344  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   345  	require.NoError(t, err)
   346  
   347  	afterReleaseCount := 0
   348  
   349  	config.AfterRelease = func(c *pgx.Conn) bool {
   350  		afterReleaseCount++
   351  		return afterReleaseCount%2 == 1
   352  	}
   353  
   354  	db, err := pgxpool.NewWithConfig(ctx, config)
   355  	require.NoError(t, err)
   356  	defer db.Close()
   357  
   358  	connPIDs := map[uint32]struct{}{}
   359  
   360  	for i := 0; i < 10; i++ {
   361  		conn, err := db.Acquire(ctx)
   362  		assert.NoError(t, err)
   363  		connPIDs[conn.Conn().PgConn().PID()] = struct{}{}
   364  		conn.Release()
   365  		waitForReleaseToComplete()
   366  	}
   367  
   368  	assert.EqualValues(t, 5, len(connPIDs))
   369  }
   370  
   371  func TestPoolBeforeClose(t *testing.T) {
   372  	t.Parallel()
   373  
   374  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   375  	defer cancel()
   376  
   377  	func() {
   378  		pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   379  		require.NoError(t, err)
   380  		defer pool.Close()
   381  	}()
   382  
   383  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   384  	require.NoError(t, err)
   385  
   386  	connPIDs := make(chan uint32, 5)
   387  	config.BeforeClose = func(c *pgx.Conn) {
   388  		connPIDs <- c.PgConn().PID()
   389  	}
   390  
   391  	db, err := pgxpool.NewWithConfig(ctx, config)
   392  	require.NoError(t, err)
   393  	defer db.Close()
   394  
   395  	acquiredPIDs := make([]uint32, 0, 5)
   396  	closedPIDs := make([]uint32, 0, 5)
   397  	for i := 0; i < 5; i++ {
   398  		conn, err := db.Acquire(ctx)
   399  		assert.NoError(t, err)
   400  		acquiredPIDs = append(acquiredPIDs, conn.Conn().PgConn().PID())
   401  		conn.Release()
   402  		db.Reset()
   403  		closedPIDs = append(closedPIDs, <-connPIDs)
   404  	}
   405  
   406  	assert.ElementsMatch(t, acquiredPIDs, closedPIDs)
   407  }
   408  
   409  func TestPoolAcquireAllIdle(t *testing.T) {
   410  	t.Parallel()
   411  
   412  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   413  	defer cancel()
   414  
   415  	db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   416  	require.NoError(t, err)
   417  	defer db.Close()
   418  
   419  	conns := make([]*pgxpool.Conn, 3)
   420  	for i := range conns {
   421  		conns[i], err = db.Acquire(ctx)
   422  		assert.NoError(t, err)
   423  	}
   424  
   425  	for _, c := range conns {
   426  		if c != nil {
   427  			c.Release()
   428  		}
   429  	}
   430  	waitForReleaseToComplete()
   431  
   432  	conns = db.AcquireAllIdle(ctx)
   433  	assert.Len(t, conns, 3)
   434  
   435  	for _, c := range conns {
   436  		c.Release()
   437  	}
   438  }
   439  
   440  func TestPoolReset(t *testing.T) {
   441  	t.Parallel()
   442  
   443  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   444  	defer cancel()
   445  
   446  	db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   447  	require.NoError(t, err)
   448  	defer db.Close()
   449  
   450  	conns := make([]*pgxpool.Conn, 3)
   451  	for i := range conns {
   452  		conns[i], err = db.Acquire(ctx)
   453  		assert.NoError(t, err)
   454  	}
   455  
   456  	db.Reset()
   457  
   458  	for _, c := range conns {
   459  		if c != nil {
   460  			c.Release()
   461  		}
   462  	}
   463  	waitForReleaseToComplete()
   464  
   465  	require.EqualValues(t, 0, db.Stat().TotalConns())
   466  }
   467  
   468  func TestConnReleaseChecksMaxConnLifetime(t *testing.T) {
   469  	t.Parallel()
   470  
   471  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   472  	defer cancel()
   473  
   474  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   475  	require.NoError(t, err)
   476  
   477  	config.MaxConnLifetime = 250 * time.Millisecond
   478  
   479  	db, err := pgxpool.NewWithConfig(ctx, config)
   480  	require.NoError(t, err)
   481  	defer db.Close()
   482  
   483  	c, err := db.Acquire(ctx)
   484  	require.NoError(t, err)
   485  
   486  	time.Sleep(config.MaxConnLifetime)
   487  
   488  	c.Release()
   489  	waitForReleaseToComplete()
   490  
   491  	stats := db.Stat()
   492  	assert.EqualValues(t, 0, stats.TotalConns())
   493  }
   494  
   495  func TestConnReleaseClosesBusyConn(t *testing.T) {
   496  	t.Parallel()
   497  
   498  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   499  	defer cancel()
   500  
   501  	db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   502  	require.NoError(t, err)
   503  	defer db.Close()
   504  
   505  	c, err := db.Acquire(ctx)
   506  	require.NoError(t, err)
   507  
   508  	_, err = c.Query(ctx, "select generate_series(1,10)")
   509  	require.NoError(t, err)
   510  
   511  	c.Release()
   512  	waitForReleaseToComplete()
   513  
   514  	// wait for the connection to actually be destroyed
   515  	for i := 0; i < 1000; i++ {
   516  		if db.Stat().TotalConns() == 0 {
   517  			break
   518  		}
   519  		time.Sleep(time.Millisecond)
   520  	}
   521  
   522  	stats := db.Stat()
   523  	assert.EqualValues(t, 0, stats.TotalConns())
   524  }
   525  
   526  func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) {
   527  	t.Parallel()
   528  
   529  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   530  	defer cancel()
   531  
   532  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   533  	require.NoError(t, err)
   534  
   535  	config.MaxConnLifetime = 100 * time.Millisecond
   536  	config.HealthCheckPeriod = 100 * time.Millisecond
   537  
   538  	db, err := pgxpool.NewWithConfig(ctx, config)
   539  	require.NoError(t, err)
   540  	defer db.Close()
   541  
   542  	c, err := db.Acquire(ctx)
   543  	require.NoError(t, err)
   544  	c.Release()
   545  	time.Sleep(config.MaxConnLifetime + 500*time.Millisecond)
   546  
   547  	stats := db.Stat()
   548  	assert.EqualValues(t, 0, stats.TotalConns())
   549  	assert.EqualValues(t, 0, stats.MaxIdleDestroyCount())
   550  	assert.EqualValues(t, 1, stats.MaxLifetimeDestroyCount())
   551  	assert.EqualValues(t, 1, stats.NewConnsCount())
   552  }
   553  
   554  func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) {
   555  	t.Parallel()
   556  
   557  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   558  	defer cancel()
   559  
   560  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   561  	require.NoError(t, err)
   562  
   563  	config.MaxConnLifetime = 1 * time.Minute
   564  	config.MaxConnIdleTime = 100 * time.Millisecond
   565  	config.HealthCheckPeriod = 150 * time.Millisecond
   566  
   567  	db, err := pgxpool.NewWithConfig(ctx, config)
   568  	require.NoError(t, err)
   569  	defer db.Close()
   570  
   571  	c, err := db.Acquire(ctx)
   572  	require.NoError(t, err)
   573  	c.Release()
   574  	time.Sleep(config.HealthCheckPeriod)
   575  
   576  	for i := 0; i < 1000; i++ {
   577  		if db.Stat().TotalConns() == 0 {
   578  			break
   579  		}
   580  		time.Sleep(time.Millisecond)
   581  	}
   582  
   583  	stats := db.Stat()
   584  	assert.EqualValues(t, 0, stats.TotalConns())
   585  	assert.EqualValues(t, 1, stats.MaxIdleDestroyCount())
   586  	assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
   587  	assert.EqualValues(t, 1, stats.NewConnsCount())
   588  }
   589  
   590  func TestPoolBackgroundChecksMinConns(t *testing.T) {
   591  	t.Parallel()
   592  
   593  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   594  	defer cancel()
   595  
   596  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   597  	require.NoError(t, err)
   598  
   599  	config.HealthCheckPeriod = 100 * time.Millisecond
   600  	config.MinConns = 2
   601  
   602  	db, err := pgxpool.NewWithConfig(ctx, config)
   603  	require.NoError(t, err)
   604  	defer db.Close()
   605  
   606  	stats := db.Stat()
   607  	for !(stats.IdleConns() == 2 && stats.MaxLifetimeDestroyCount() == 0 && stats.NewConnsCount() == 2) && ctx.Err() == nil {
   608  		time.Sleep(50 * time.Millisecond)
   609  		stats = db.Stat()
   610  	}
   611  	require.EqualValues(t, 2, stats.IdleConns())
   612  	require.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
   613  	require.EqualValues(t, 2, stats.NewConnsCount())
   614  
   615  	c, err := db.Acquire(ctx)
   616  	require.NoError(t, err)
   617  
   618  	stats = db.Stat()
   619  	require.EqualValues(t, 1, stats.IdleConns())
   620  	require.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
   621  	require.EqualValues(t, 2, stats.NewConnsCount())
   622  
   623  	err = c.Conn().Close(ctx)
   624  	require.NoError(t, err)
   625  	c.Release()
   626  
   627  	stats = db.Stat()
   628  	for !(stats.IdleConns() == 2 && stats.MaxIdleDestroyCount() == 0 && stats.NewConnsCount() == 3) && ctx.Err() == nil {
   629  		time.Sleep(50 * time.Millisecond)
   630  		stats = db.Stat()
   631  	}
   632  	require.EqualValues(t, 2, stats.TotalConns())
   633  	require.EqualValues(t, 0, stats.MaxIdleDestroyCount())
   634  	require.EqualValues(t, 3, stats.NewConnsCount())
   635  }
   636  
   637  func TestPoolExec(t *testing.T) {
   638  	t.Parallel()
   639  
   640  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   641  	defer cancel()
   642  
   643  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   644  	require.NoError(t, err)
   645  	defer pool.Close()
   646  
   647  	testExec(t, ctx, pool)
   648  }
   649  
   650  func TestPoolQuery(t *testing.T) {
   651  	t.Parallel()
   652  
   653  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   654  	defer cancel()
   655  
   656  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   657  	require.NoError(t, err)
   658  	defer pool.Close()
   659  
   660  	// Test common usage
   661  	testQuery(t, ctx, pool)
   662  	waitForReleaseToComplete()
   663  
   664  	// Test expected pool behavior
   665  	rows, err := pool.Query(ctx, "select generate_series(1,$1)", 10)
   666  	require.NoError(t, err)
   667  
   668  	stats := pool.Stat()
   669  	assert.EqualValues(t, 1, stats.AcquiredConns())
   670  	assert.EqualValues(t, 1, stats.TotalConns())
   671  
   672  	rows.Close()
   673  	assert.NoError(t, rows.Err())
   674  	waitForReleaseToComplete()
   675  
   676  	stats = pool.Stat()
   677  	assert.EqualValues(t, 0, stats.AcquiredConns())
   678  	assert.EqualValues(t, 1, stats.TotalConns())
   679  
   680  }
   681  
   682  func TestPoolQueryRow(t *testing.T) {
   683  	t.Parallel()
   684  
   685  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   686  	defer cancel()
   687  
   688  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   689  	require.NoError(t, err)
   690  	defer pool.Close()
   691  
   692  	testQueryRow(t, ctx, pool)
   693  	waitForReleaseToComplete()
   694  
   695  	stats := pool.Stat()
   696  	assert.EqualValues(t, 0, stats.AcquiredConns())
   697  	assert.EqualValues(t, 1, stats.TotalConns())
   698  }
   699  
   700  // https://github.com/jackc/pgx/issues/677
   701  func TestPoolQueryRowErrNoRows(t *testing.T) {
   702  	t.Parallel()
   703  
   704  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   705  	defer cancel()
   706  
   707  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   708  	require.NoError(t, err)
   709  	defer pool.Close()
   710  
   711  	err = pool.QueryRow(ctx, "select n from generate_series(1,10) n where n=0").Scan(nil)
   712  	require.Equal(t, pgx.ErrNoRows, err)
   713  }
   714  
   715  // https://github.com/jackc/pgx/issues/1628
   716  func TestPoolQueryRowScanPanicReleasesConnection(t *testing.T) {
   717  	t.Parallel()
   718  
   719  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   720  	defer cancel()
   721  
   722  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   723  	require.NoError(t, err)
   724  	defer pool.Close()
   725  
   726  	require.Panics(t, func() {
   727  		var greeting *string
   728  		pool.QueryRow(ctx, "select 'Hello, world!'").Scan(greeting) // Note lack of &. This means that a typed nil is passed to Scan.
   729  	})
   730  
   731  	// If the connection is not released this will block forever in the defer pool.Close().
   732  }
   733  
   734  func TestPoolSendBatch(t *testing.T) {
   735  	t.Parallel()
   736  
   737  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   738  	defer cancel()
   739  
   740  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   741  	require.NoError(t, err)
   742  	defer pool.Close()
   743  
   744  	testSendBatch(t, ctx, pool)
   745  	waitForReleaseToComplete()
   746  
   747  	stats := pool.Stat()
   748  	assert.EqualValues(t, 0, stats.AcquiredConns())
   749  	assert.EqualValues(t, 1, stats.TotalConns())
   750  }
   751  
   752  func TestPoolCopyFrom(t *testing.T) {
   753  	// Not able to use testCopyFrom because it relies on temporary tables and the pool may run subsequent calls under
   754  	// different connections.
   755  	t.Parallel()
   756  
   757  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   758  	defer cancel()
   759  
   760  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   761  	require.NoError(t, err)
   762  	defer pool.Close()
   763  
   764  	_, err = pool.Exec(ctx, `drop table if exists poolcopyfromtest`)
   765  	require.NoError(t, err)
   766  
   767  	_, err = pool.Exec(ctx, `create table poolcopyfromtest(a int2, b int4, c int8, d varchar, e text, f date, g timestamptz)`)
   768  	require.NoError(t, err)
   769  	defer pool.Exec(ctx, `drop table poolcopyfromtest`)
   770  
   771  	tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
   772  
   773  	inputRows := [][]any{
   774  		{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
   775  		{nil, nil, nil, nil, nil, nil, nil},
   776  	}
   777  
   778  	copyCount, err := pool.CopyFrom(ctx, pgx.Identifier{"poolcopyfromtest"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
   779  	assert.NoError(t, err)
   780  	assert.EqualValues(t, len(inputRows), copyCount)
   781  
   782  	rows, err := pool.Query(ctx, "select * from poolcopyfromtest")
   783  	assert.NoError(t, err)
   784  
   785  	var outputRows [][]any
   786  	for rows.Next() {
   787  		row, err := rows.Values()
   788  		if err != nil {
   789  			t.Errorf("Unexpected error for rows.Values(): %v", err)
   790  		}
   791  		outputRows = append(outputRows, row)
   792  	}
   793  
   794  	assert.NoError(t, rows.Err())
   795  	assert.Equal(t, inputRows, outputRows)
   796  }
   797  
   798  func TestConnReleaseClosesConnInFailedTransaction(t *testing.T) {
   799  	t.Parallel()
   800  
   801  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   802  	defer cancel()
   803  
   804  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   805  	require.NoError(t, err)
   806  	defer pool.Close()
   807  
   808  	c, err := pool.Acquire(ctx)
   809  	require.NoError(t, err)
   810  
   811  	pid := c.Conn().PgConn().PID()
   812  
   813  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   814  
   815  	_, err = c.Exec(ctx, "begin")
   816  	assert.NoError(t, err)
   817  
   818  	assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
   819  
   820  	_, err = c.Exec(ctx, "selct")
   821  	assert.Error(t, err)
   822  
   823  	assert.Equal(t, byte('E'), c.Conn().PgConn().TxStatus())
   824  
   825  	c.Release()
   826  	waitForReleaseToComplete()
   827  
   828  	c, err = pool.Acquire(ctx)
   829  	require.NoError(t, err)
   830  
   831  	assert.NotEqual(t, pid, c.Conn().PgConn().PID())
   832  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   833  
   834  	c.Release()
   835  }
   836  
   837  func TestConnReleaseClosesConnInTransaction(t *testing.T) {
   838  	t.Parallel()
   839  
   840  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   841  	defer cancel()
   842  
   843  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   844  	require.NoError(t, err)
   845  	defer pool.Close()
   846  
   847  	c, err := pool.Acquire(ctx)
   848  	require.NoError(t, err)
   849  
   850  	pid := c.Conn().PgConn().PID()
   851  
   852  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   853  
   854  	_, err = c.Exec(ctx, "begin")
   855  	assert.NoError(t, err)
   856  
   857  	assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
   858  
   859  	c.Release()
   860  	waitForReleaseToComplete()
   861  
   862  	c, err = pool.Acquire(ctx)
   863  	require.NoError(t, err)
   864  
   865  	assert.NotEqual(t, pid, c.Conn().PgConn().PID())
   866  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   867  
   868  	c.Release()
   869  }
   870  
   871  func TestConnReleaseDestroysClosedConn(t *testing.T) {
   872  	t.Parallel()
   873  
   874  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   875  	defer cancel()
   876  
   877  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   878  	require.NoError(t, err)
   879  	defer pool.Close()
   880  
   881  	c, err := pool.Acquire(ctx)
   882  	require.NoError(t, err)
   883  
   884  	err = c.Conn().Close(ctx)
   885  	require.NoError(t, err)
   886  
   887  	assert.EqualValues(t, 1, pool.Stat().TotalConns())
   888  
   889  	c.Release()
   890  	waitForReleaseToComplete()
   891  
   892  	// wait for the connection to actually be destroyed
   893  	for i := 0; i < 1000; i++ {
   894  		if pool.Stat().TotalConns() == 0 {
   895  			break
   896  		}
   897  		time.Sleep(time.Millisecond)
   898  	}
   899  
   900  	assert.EqualValues(t, 0, pool.Stat().TotalConns())
   901  }
   902  
   903  func TestConnPoolQueryConcurrentLoad(t *testing.T) {
   904  	t.Parallel()
   905  
   906  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   907  	defer cancel()
   908  
   909  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   910  	require.NoError(t, err)
   911  	defer pool.Close()
   912  
   913  	n := 100
   914  	done := make(chan bool)
   915  
   916  	for i := 0; i < n; i++ {
   917  		go func() {
   918  			defer func() { done <- true }()
   919  			testQuery(t, ctx, pool)
   920  			testQueryRow(t, ctx, pool)
   921  		}()
   922  	}
   923  
   924  	for i := 0; i < n; i++ {
   925  		<-done
   926  	}
   927  }
   928  
   929  func TestConnReleaseWhenBeginFail(t *testing.T) {
   930  	t.Parallel()
   931  
   932  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   933  	defer cancel()
   934  
   935  	db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   936  	require.NoError(t, err)
   937  	defer db.Close()
   938  
   939  	tx, err := db.BeginTx(ctx, pgx.TxOptions{
   940  		IsoLevel: pgx.TxIsoLevel("foo"),
   941  	})
   942  	assert.Error(t, err)
   943  	if !assert.Zero(t, tx) {
   944  		err := tx.Rollback(ctx)
   945  		assert.NoError(t, err)
   946  	}
   947  
   948  	for i := 0; i < 1000; i++ {
   949  		if db.Stat().TotalConns() == 0 {
   950  			break
   951  		}
   952  		time.Sleep(time.Millisecond)
   953  	}
   954  
   955  	assert.EqualValues(t, 0, db.Stat().TotalConns())
   956  }
   957  
   958  func TestTxBeginFuncNestedTransactionCommit(t *testing.T) {
   959  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   960  	defer cancel()
   961  
   962  	db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
   963  	require.NoError(t, err)
   964  	defer db.Close()
   965  
   966  	createSql := `
   967  		drop table if exists pgxpooltx;
   968      create temporary table pgxpooltx(
   969        id integer,
   970        unique (id)
   971      );
   972    `
   973  
   974  	_, err = db.Exec(ctx, createSql)
   975  	require.NoError(t, err)
   976  
   977  	defer func() {
   978  		db.Exec(ctx, "drop table pgxpooltx")
   979  	}()
   980  
   981  	err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
   982  		_, err := db.Exec(ctx, "insert into pgxpooltx(id) values (1)")
   983  		require.NoError(t, err)
   984  
   985  		err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
   986  			_, err := db.Exec(ctx, "insert into pgxpooltx(id) values (2)")
   987  			require.NoError(t, err)
   988  
   989  			err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
   990  				_, err := db.Exec(ctx, "insert into pgxpooltx(id) values (3)")
   991  				require.NoError(t, err)
   992  				return nil
   993  			})
   994  			require.NoError(t, err)
   995  			return nil
   996  		})
   997  		require.NoError(t, err)
   998  		return nil
   999  	})
  1000  	require.NoError(t, err)
  1001  
  1002  	var n int64
  1003  	err = db.QueryRow(ctx, "select count(*) from pgxpooltx").Scan(&n)
  1004  	require.NoError(t, err)
  1005  	require.EqualValues(t, 3, n)
  1006  }
  1007  
  1008  func TestTxBeginFuncNestedTransactionRollback(t *testing.T) {
  1009  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1010  	defer cancel()
  1011  
  1012  	db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
  1013  	require.NoError(t, err)
  1014  	defer db.Close()
  1015  
  1016  	createSql := `
  1017  		drop table if exists pgxpooltx;
  1018      create temporary table pgxpooltx(
  1019        id integer,
  1020        unique (id)
  1021      );
  1022    `
  1023  
  1024  	_, err = db.Exec(ctx, createSql)
  1025  	require.NoError(t, err)
  1026  
  1027  	defer func() {
  1028  		db.Exec(ctx, "drop table pgxpooltx")
  1029  	}()
  1030  
  1031  	err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
  1032  		_, err := db.Exec(ctx, "insert into pgxpooltx(id) values (1)")
  1033  		require.NoError(t, err)
  1034  
  1035  		err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
  1036  			_, err := db.Exec(ctx, "insert into pgxpooltx(id) values (2)")
  1037  			require.NoError(t, err)
  1038  			return errors.New("do a rollback")
  1039  		})
  1040  		require.EqualError(t, err, "do a rollback")
  1041  
  1042  		_, err = db.Exec(ctx, "insert into pgxpooltx(id) values (3)")
  1043  		require.NoError(t, err)
  1044  
  1045  		return nil
  1046  	})
  1047  	require.NoError(t, err)
  1048  
  1049  	var n int64
  1050  	err = db.QueryRow(ctx, "select count(*) from pgxpooltx").Scan(&n)
  1051  	require.NoError(t, err)
  1052  	require.EqualValues(t, 2, n)
  1053  }
  1054  
  1055  func TestIdempotentPoolClose(t *testing.T) {
  1056  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1057  	defer cancel()
  1058  
  1059  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
  1060  	require.NoError(t, err)
  1061  
  1062  	// Close the open pool.
  1063  	require.NotPanics(t, func() { pool.Close() })
  1064  
  1065  	// Close the already closed pool.
  1066  	require.NotPanics(t, func() { pool.Close() })
  1067  }
  1068  
  1069  func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
  1070  	t.Parallel()
  1071  
  1072  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1073  	defer cancel()
  1074  
  1075  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
  1076  	require.NoError(t, err)
  1077  
  1078  	config.MinConns = int32(12)
  1079  	config.MaxConns = int32(15)
  1080  
  1081  	acquireAttempts := int64(0)
  1082  	connectAttempts := int64(0)
  1083  
  1084  	config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
  1085  		atomic.AddInt64(&acquireAttempts, 1)
  1086  		return true
  1087  	}
  1088  	config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
  1089  		atomic.AddInt64(&connectAttempts, 1)
  1090  		return nil
  1091  	}
  1092  
  1093  	pool, err := pgxpool.NewWithConfig(ctx, config)
  1094  	require.NoError(t, err)
  1095  	defer pool.Close()
  1096  
  1097  	for i := 0; i < 500; i++ {
  1098  		time.Sleep(10 * time.Millisecond)
  1099  
  1100  		stat := pool.Stat()
  1101  		if stat.IdleConns() == 12 && stat.AcquireCount() == 0 && stat.TotalConns() == 12 && atomic.LoadInt64(&acquireAttempts) == 0 && atomic.LoadInt64(&connectAttempts) == 12 {
  1102  			return
  1103  		}
  1104  	}
  1105  
  1106  	t.Fatal("did not reach min pool size")
  1107  
  1108  }
  1109  
  1110  func TestPoolSendBatchBatchCloseTwice(t *testing.T) {
  1111  	t.Parallel()
  1112  
  1113  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1114  	defer cancel()
  1115  
  1116  	pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
  1117  	require.NoError(t, err)
  1118  	defer pool.Close()
  1119  
  1120  	errChan := make(chan error)
  1121  	testCount := 5000
  1122  
  1123  	for i := 0; i < testCount; i++ {
  1124  		go func() {
  1125  			batch := &pgx.Batch{}
  1126  			batch.Queue("select 1")
  1127  			batch.Queue("select 2")
  1128  
  1129  			br := pool.SendBatch(ctx, batch)
  1130  			defer br.Close()
  1131  
  1132  			var err error
  1133  			var n int32
  1134  			err = br.QueryRow().Scan(&n)
  1135  			if err != nil {
  1136  				errChan <- err
  1137  				return
  1138  			}
  1139  			if n != 1 {
  1140  				errChan <- fmt.Errorf("expected 1 got %v", n)
  1141  				return
  1142  			}
  1143  
  1144  			err = br.QueryRow().Scan(&n)
  1145  			if err != nil {
  1146  				errChan <- err
  1147  				return
  1148  			}
  1149  			if n != 2 {
  1150  				errChan <- fmt.Errorf("expected 2 got %v", n)
  1151  				return
  1152  			}
  1153  
  1154  			err = br.Close()
  1155  			errChan <- err
  1156  		}()
  1157  	}
  1158  
  1159  	for i := 0; i < testCount; i++ {
  1160  		err := <-errChan
  1161  		assert.NoError(t, err)
  1162  	}
  1163  }
  1164  

View as plain text