...

Source file src/github.com/jackc/pgx/v4/pgxpool/pool_test.go

Documentation: github.com/jackc/pgx/v4/pgxpool

     1  package pgxpool_test
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"sync/atomic"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/jackc/pgx/v4"
    13  	"github.com/jackc/pgx/v4/pgxpool"
    14  	"github.com/stretchr/testify/assert"
    15  	"github.com/stretchr/testify/require"
    16  )
    17  
    18  func TestConnect(t *testing.T) {
    19  	t.Parallel()
    20  	connString := os.Getenv("PGX_TEST_DATABASE")
    21  	pool, err := pgxpool.Connect(context.Background(), connString)
    22  	require.NoError(t, err)
    23  	assert.Equal(t, connString, pool.Config().ConnString())
    24  	pool.Close()
    25  }
    26  
    27  func TestConnectConfig(t *testing.T) {
    28  	t.Parallel()
    29  	connString := os.Getenv("PGX_TEST_DATABASE")
    30  	config, err := pgxpool.ParseConfig(connString)
    31  	require.NoError(t, err)
    32  	pool, err := pgxpool.ConnectConfig(context.Background(), config)
    33  	require.NoError(t, err)
    34  	assertConfigsEqual(t, config, pool.Config(), "Pool.Config() returns original config")
    35  	pool.Close()
    36  }
    37  
    38  func TestParseConfigExtractsPoolArguments(t *testing.T) {
    39  	t.Parallel()
    40  
    41  	config, err := pgxpool.ParseConfig("pool_max_conns=42 pool_min_conns=1")
    42  	assert.NoError(t, err)
    43  	assert.EqualValues(t, 42, config.MaxConns)
    44  	assert.EqualValues(t, 1, config.MinConns)
    45  	assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns")
    46  	assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns")
    47  }
    48  
    49  func TestConnectCancel(t *testing.T) {
    50  	t.Parallel()
    51  
    52  	ctx, cancel := context.WithCancel(context.Background())
    53  	cancel()
    54  	pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
    55  	assert.Nil(t, pool)
    56  	assert.Equal(t, context.Canceled, err)
    57  }
    58  
    59  func TestLazyConnect(t *testing.T) {
    60  	t.Parallel()
    61  
    62  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
    63  	assert.NoError(t, err)
    64  	config.LazyConnect = true
    65  
    66  	ctx, cancel := context.WithCancel(context.Background())
    67  	cancel()
    68  
    69  	pool, err := pgxpool.ConnectConfig(ctx, config)
    70  	assert.NoError(t, err)
    71  
    72  	_, err = pool.Exec(ctx, "SELECT 1")
    73  	assert.Equal(t, context.Canceled, err)
    74  }
    75  
    76  func TestBeforeConnectWithContextWithValueAndOneMinConn(t *testing.T) {
    77  	t.Parallel()
    78  
    79  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
    80  	assert.NoError(t, err)
    81  	config.MinConns = 1
    82  	config.BeforeConnect = func(ctx context.Context, config *pgx.ConnConfig) error {
    83  		val := ctx.Value("key")
    84  		if val == nil {
    85  			return errors.New("no value found with key 'key'")
    86  		}
    87  		return nil
    88  	}
    89  
    90  	ctx := context.WithValue(context.Background(), "key", "value")
    91  
    92  	_, err = pgxpool.ConnectConfig(ctx, config)
    93  	assert.NoError(t, err)
    94  }
    95  
    96  func TestConstructorIgnoresContext(t *testing.T) {
    97  	t.Parallel()
    98  
    99  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   100  	assert.NoError(t, err)
   101  	config.LazyConnect = true
   102  	var cancel func()
   103  	config.BeforeConnect = func(context.Context, *pgx.ConnConfig) error {
   104  		// cancel the query's context before we actually Dial to ensure the Dial's
   105  		// context isn't cancelled
   106  		cancel()
   107  		return nil
   108  	}
   109  
   110  	pool, err := pgxpool.ConnectConfig(context.Background(), config)
   111  	require.NoError(t, err)
   112  
   113  	assert.EqualValues(t, 0, pool.Stat().TotalConns())
   114  
   115  	var ctx context.Context
   116  	ctx, cancel = context.WithCancel(context.Background())
   117  	defer cancel()
   118  	_, err = pool.Exec(ctx, "SELECT 1")
   119  	assert.ErrorIs(t, err, context.Canceled)
   120  	assert.EqualValues(t, 1, pool.Stat().TotalConns())
   121  }
   122  
   123  func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
   124  	t.Parallel()
   125  
   126  	config := &pgxpool.Config{}
   127  
   128  	require.PanicsWithValue(t, "config must be created by ParseConfig", func() { pgxpool.ConnectConfig(context.Background(), config) })
   129  }
   130  
   131  func TestConfigCopyReturnsEqualConfig(t *testing.T) {
   132  	connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
   133  	original, err := pgxpool.ParseConfig(connString)
   134  	require.NoError(t, err)
   135  
   136  	copied := original.Copy()
   137  
   138  	assertConfigsEqual(t, original, copied, t.Name())
   139  }
   140  
   141  func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
   142  	connString := os.Getenv("PGX_TEST_DATABASE")
   143  	original, err := pgxpool.ParseConfig(connString)
   144  	require.NoError(t, err)
   145  
   146  	copied := original.Copy()
   147  	assert.NotPanics(t, func() {
   148  		_, err = pgxpool.ConnectConfig(context.Background(), copied)
   149  	})
   150  	assert.NoError(t, err)
   151  }
   152  
   153  func TestPoolAcquireAndConnRelease(t *testing.T) {
   154  	t.Parallel()
   155  
   156  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   157  	require.NoError(t, err)
   158  	defer pool.Close()
   159  
   160  	c, err := pool.Acquire(context.Background())
   161  	require.NoError(t, err)
   162  	c.Release()
   163  }
   164  
   165  func TestPoolAcquireAndConnHijack(t *testing.T) {
   166  	t.Parallel()
   167  
   168  	ctx := context.Background()
   169  
   170  	pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
   171  	require.NoError(t, err)
   172  	defer pool.Close()
   173  
   174  	c, err := pool.Acquire(ctx)
   175  	require.NoError(t, err)
   176  
   177  	connsBeforeHijack := pool.Stat().TotalConns()
   178  
   179  	conn := c.Hijack()
   180  	defer conn.Close(ctx)
   181  
   182  	connsAfterHijack := pool.Stat().TotalConns()
   183  	require.Equal(t, connsBeforeHijack-1, connsAfterHijack)
   184  
   185  	var n int32
   186  	err = conn.QueryRow(ctx, `select 1`).Scan(&n)
   187  	require.NoError(t, err)
   188  	require.Equal(t, int32(1), n)
   189  }
   190  
   191  func TestPoolAcquireFunc(t *testing.T) {
   192  	t.Parallel()
   193  
   194  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   195  	require.NoError(t, err)
   196  	defer pool.Close()
   197  
   198  	var n int32
   199  	err = pool.AcquireFunc(context.Background(), func(c *pgxpool.Conn) error {
   200  		return c.QueryRow(context.Background(), "select 1").Scan(&n)
   201  	})
   202  	require.NoError(t, err)
   203  	require.EqualValues(t, 1, n)
   204  }
   205  
   206  func TestPoolAcquireFuncReturnsFnError(t *testing.T) {
   207  	t.Parallel()
   208  
   209  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   210  	require.NoError(t, err)
   211  	defer pool.Close()
   212  
   213  	err = pool.AcquireFunc(context.Background(), func(c *pgxpool.Conn) error {
   214  		return fmt.Errorf("some error")
   215  	})
   216  	require.EqualError(t, err, "some error")
   217  }
   218  
   219  func TestPoolBeforeConnect(t *testing.T) {
   220  	t.Parallel()
   221  
   222  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   223  	require.NoError(t, err)
   224  
   225  	config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
   226  		cfg.Config.RuntimeParams["application_name"] = "pgx"
   227  		return nil
   228  	}
   229  
   230  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   231  	require.NoError(t, err)
   232  	defer db.Close()
   233  
   234  	var str string
   235  	err = db.QueryRow(context.Background(), "SHOW application_name").Scan(&str)
   236  	require.NoError(t, err)
   237  	assert.EqualValues(t, "pgx", str)
   238  }
   239  
   240  func TestPoolAfterConnect(t *testing.T) {
   241  	t.Parallel()
   242  
   243  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   244  	require.NoError(t, err)
   245  
   246  	config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
   247  		_, err := c.Prepare(ctx, "ps1", "select 1")
   248  		return err
   249  	}
   250  
   251  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   252  	require.NoError(t, err)
   253  	defer db.Close()
   254  
   255  	var n int32
   256  	err = db.QueryRow(context.Background(), "ps1").Scan(&n)
   257  	require.NoError(t, err)
   258  	assert.EqualValues(t, 1, n)
   259  }
   260  
   261  func TestPoolBeforeAcquire(t *testing.T) {
   262  	t.Parallel()
   263  
   264  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   265  	require.NoError(t, err)
   266  
   267  	acquireAttempts := 0
   268  
   269  	config.BeforeAcquire = func(ctx context.Context, c *pgx.Conn) bool {
   270  		acquireAttempts++
   271  		return acquireAttempts%2 == 0
   272  	}
   273  
   274  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   275  	require.NoError(t, err)
   276  	defer db.Close()
   277  
   278  	conns := make([]*pgxpool.Conn, 4)
   279  	for i := range conns {
   280  		conns[i], err = db.Acquire(context.Background())
   281  		assert.NoError(t, err)
   282  	}
   283  
   284  	for _, c := range conns {
   285  		c.Release()
   286  	}
   287  	waitForReleaseToComplete()
   288  
   289  	assert.EqualValues(t, 8, acquireAttempts)
   290  
   291  	conns = db.AcquireAllIdle(context.Background())
   292  	assert.Len(t, conns, 2)
   293  
   294  	for _, c := range conns {
   295  		c.Release()
   296  	}
   297  	waitForReleaseToComplete()
   298  
   299  	assert.EqualValues(t, 12, acquireAttempts)
   300  }
   301  
   302  func TestPoolAfterRelease(t *testing.T) {
   303  	t.Parallel()
   304  
   305  	func() {
   306  		pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   307  		require.NoError(t, err)
   308  		defer pool.Close()
   309  
   310  		err = pool.AcquireFunc(context.Background(), func(conn *pgxpool.Conn) error {
   311  			if conn.Conn().PgConn().ParameterStatus("crdb_version") != "" {
   312  				t.Skip("Server does not support backend PID")
   313  			}
   314  			return nil
   315  		})
   316  		require.NoError(t, err)
   317  	}()
   318  
   319  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   320  	require.NoError(t, err)
   321  
   322  	afterReleaseCount := 0
   323  
   324  	config.AfterRelease = func(c *pgx.Conn) bool {
   325  		afterReleaseCount++
   326  		return afterReleaseCount%2 == 1
   327  	}
   328  
   329  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   330  	require.NoError(t, err)
   331  	defer db.Close()
   332  
   333  	connPIDs := map[uint32]struct{}{}
   334  
   335  	for i := 0; i < 10; i++ {
   336  		conn, err := db.Acquire(context.Background())
   337  		assert.NoError(t, err)
   338  		connPIDs[conn.Conn().PgConn().PID()] = struct{}{}
   339  		conn.Release()
   340  		waitForReleaseToComplete()
   341  	}
   342  
   343  	assert.EqualValues(t, 5, len(connPIDs))
   344  }
   345  
   346  func TestPoolAcquireAllIdle(t *testing.T) {
   347  	t.Parallel()
   348  
   349  	db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   350  	require.NoError(t, err)
   351  	defer db.Close()
   352  
   353  	conns := db.AcquireAllIdle(context.Background())
   354  	assert.Len(t, conns, 1)
   355  
   356  	for _, c := range conns {
   357  		c.Release()
   358  	}
   359  	waitForReleaseToComplete()
   360  
   361  	conns = make([]*pgxpool.Conn, 3)
   362  	for i := range conns {
   363  		conns[i], err = db.Acquire(context.Background())
   364  		assert.NoError(t, err)
   365  	}
   366  
   367  	for _, c := range conns {
   368  		if c != nil {
   369  			c.Release()
   370  		}
   371  	}
   372  	waitForReleaseToComplete()
   373  
   374  	conns = db.AcquireAllIdle(context.Background())
   375  	assert.Len(t, conns, 3)
   376  
   377  	for _, c := range conns {
   378  		c.Release()
   379  	}
   380  }
   381  
   382  func TestConnReleaseChecksMaxConnLifetime(t *testing.T) {
   383  	t.Parallel()
   384  
   385  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   386  	require.NoError(t, err)
   387  
   388  	config.MaxConnLifetime = 250 * time.Millisecond
   389  
   390  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   391  	require.NoError(t, err)
   392  	defer db.Close()
   393  
   394  	c, err := db.Acquire(context.Background())
   395  	require.NoError(t, err)
   396  
   397  	time.Sleep(config.MaxConnLifetime)
   398  
   399  	c.Release()
   400  	waitForReleaseToComplete()
   401  
   402  	stats := db.Stat()
   403  	assert.EqualValues(t, 0, stats.TotalConns())
   404  }
   405  
   406  func TestConnReleaseClosesBusyConn(t *testing.T) {
   407  	t.Parallel()
   408  
   409  	db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   410  	require.NoError(t, err)
   411  	defer db.Close()
   412  
   413  	c, err := db.Acquire(context.Background())
   414  	require.NoError(t, err)
   415  
   416  	_, err = c.Query(context.Background(), "select generate_series(1,10)")
   417  	require.NoError(t, err)
   418  
   419  	c.Release()
   420  	waitForReleaseToComplete()
   421  
   422  	// wait for the connection to actually be destroyed
   423  	for i := 0; i < 1000; i++ {
   424  		if db.Stat().TotalConns() == 0 {
   425  			break
   426  		}
   427  		time.Sleep(time.Millisecond)
   428  	}
   429  
   430  	stats := db.Stat()
   431  	assert.EqualValues(t, 0, stats.TotalConns())
   432  }
   433  
   434  func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) {
   435  	t.Parallel()
   436  
   437  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   438  	require.NoError(t, err)
   439  
   440  	config.MaxConnLifetime = 100 * time.Millisecond
   441  	config.HealthCheckPeriod = 100 * time.Millisecond
   442  
   443  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   444  	require.NoError(t, err)
   445  	defer db.Close()
   446  
   447  	c, err := db.Acquire(context.Background())
   448  	require.NoError(t, err)
   449  	c.Release()
   450  	time.Sleep(config.MaxConnLifetime + 500*time.Millisecond)
   451  
   452  	stats := db.Stat()
   453  	assert.EqualValues(t, 0, stats.TotalConns())
   454  	assert.EqualValues(t, 0, stats.MaxIdleDestroyCount())
   455  	assert.EqualValues(t, 1, stats.MaxLifetimeDestroyCount())
   456  }
   457  
   458  func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) {
   459  	t.Parallel()
   460  
   461  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   462  	require.NoError(t, err)
   463  
   464  	config.MaxConnLifetime = 1 * time.Minute
   465  	config.MaxConnIdleTime = 100 * time.Millisecond
   466  	config.HealthCheckPeriod = 150 * time.Millisecond
   467  
   468  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   469  	require.NoError(t, err)
   470  	defer db.Close()
   471  
   472  	c, err := db.Acquire(context.Background())
   473  	require.NoError(t, err)
   474  	c.Release()
   475  	time.Sleep(config.HealthCheckPeriod)
   476  
   477  	for i := 0; i < 1000; i++ {
   478  		if db.Stat().TotalConns() == 0 {
   479  			break
   480  		}
   481  		time.Sleep(time.Millisecond)
   482  	}
   483  
   484  	stats := db.Stat()
   485  	assert.EqualValues(t, 0, stats.TotalConns())
   486  	assert.EqualValues(t, 1, stats.MaxIdleDestroyCount())
   487  	assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
   488  }
   489  
   490  func TestPoolBackgroundChecksMinConns(t *testing.T) {
   491  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   492  	require.NoError(t, err)
   493  
   494  	config.HealthCheckPeriod = 100 * time.Millisecond
   495  	config.MinConns = 2
   496  
   497  	db, err := pgxpool.ConnectConfig(context.Background(), config)
   498  	require.NoError(t, err)
   499  	defer db.Close()
   500  
   501  	time.Sleep(config.HealthCheckPeriod + 500*time.Millisecond)
   502  
   503  	stats := db.Stat()
   504  	assert.EqualValues(t, 2, stats.TotalConns())
   505  	assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
   506  	assert.EqualValues(t, 2, stats.NewConnsCount())
   507  
   508  	c, err := db.Acquire(context.Background())
   509  	require.NoError(t, err)
   510  	err = c.Conn().Close(context.Background())
   511  	require.NoError(t, err)
   512  	c.Release()
   513  
   514  	time.Sleep(config.HealthCheckPeriod + 500*time.Millisecond)
   515  
   516  	stats = db.Stat()
   517  	assert.EqualValues(t, 2, stats.TotalConns())
   518  	assert.EqualValues(t, 0, stats.MaxIdleDestroyCount())
   519  	assert.EqualValues(t, 3, stats.NewConnsCount())
   520  }
   521  
   522  func TestPoolExec(t *testing.T) {
   523  	t.Parallel()
   524  
   525  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   526  	require.NoError(t, err)
   527  	defer pool.Close()
   528  
   529  	testExec(t, pool)
   530  }
   531  
   532  func TestPoolQuery(t *testing.T) {
   533  	t.Parallel()
   534  
   535  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   536  	require.NoError(t, err)
   537  	defer pool.Close()
   538  
   539  	// Test common usage
   540  	testQuery(t, pool)
   541  	waitForReleaseToComplete()
   542  
   543  	// Test expected pool behavior
   544  	rows, err := pool.Query(context.Background(), "select generate_series(1,$1)", 10)
   545  	require.NoError(t, err)
   546  
   547  	stats := pool.Stat()
   548  	assert.EqualValues(t, 1, stats.AcquiredConns())
   549  	assert.EqualValues(t, 1, stats.TotalConns())
   550  
   551  	rows.Close()
   552  	assert.NoError(t, rows.Err())
   553  	waitForReleaseToComplete()
   554  
   555  	stats = pool.Stat()
   556  	assert.EqualValues(t, 0, stats.AcquiredConns())
   557  	assert.EqualValues(t, 1, stats.TotalConns())
   558  
   559  }
   560  
   561  func TestPoolQueryRow(t *testing.T) {
   562  	t.Parallel()
   563  
   564  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   565  	require.NoError(t, err)
   566  	defer pool.Close()
   567  
   568  	testQueryRow(t, pool)
   569  	waitForReleaseToComplete()
   570  
   571  	stats := pool.Stat()
   572  	assert.EqualValues(t, 0, stats.AcquiredConns())
   573  	assert.EqualValues(t, 1, stats.TotalConns())
   574  }
   575  
   576  // https://github.com/jackc/pgx/issues/677
   577  func TestPoolQueryRowErrNoRows(t *testing.T) {
   578  	t.Parallel()
   579  
   580  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   581  	require.NoError(t, err)
   582  	defer pool.Close()
   583  
   584  	err = pool.QueryRow(context.Background(), "select n from generate_series(1,10) n where n=0").Scan(nil)
   585  	require.Equal(t, pgx.ErrNoRows, err)
   586  }
   587  
   588  func TestPoolSendBatch(t *testing.T) {
   589  	t.Parallel()
   590  
   591  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   592  	require.NoError(t, err)
   593  	defer pool.Close()
   594  
   595  	testSendBatch(t, pool)
   596  	waitForReleaseToComplete()
   597  
   598  	stats := pool.Stat()
   599  	assert.EqualValues(t, 0, stats.AcquiredConns())
   600  	assert.EqualValues(t, 1, stats.TotalConns())
   601  }
   602  
   603  func TestPoolCopyFrom(t *testing.T) {
   604  	// Not able to use testCopyFrom because it relies on temporary tables and the pool may run subsequent calls under
   605  	// different connections.
   606  	t.Parallel()
   607  
   608  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   609  	defer cancel()
   610  
   611  	pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
   612  	require.NoError(t, err)
   613  	defer pool.Close()
   614  
   615  	_, err = pool.Exec(ctx, `drop table if exists poolcopyfromtest`)
   616  	require.NoError(t, err)
   617  
   618  	_, err = pool.Exec(ctx, `create table poolcopyfromtest(a int2, b int4, c int8, d varchar, e text, f date, g timestamptz)`)
   619  	require.NoError(t, err)
   620  	defer pool.Exec(ctx, `drop table poolcopyfromtest`)
   621  
   622  	tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
   623  
   624  	inputRows := [][]interface{}{
   625  		{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
   626  		{nil, nil, nil, nil, nil, nil, nil},
   627  	}
   628  
   629  	copyCount, err := pool.CopyFrom(ctx, pgx.Identifier{"poolcopyfromtest"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
   630  	assert.NoError(t, err)
   631  	assert.EqualValues(t, len(inputRows), copyCount)
   632  
   633  	rows, err := pool.Query(ctx, "select * from poolcopyfromtest")
   634  	assert.NoError(t, err)
   635  
   636  	var outputRows [][]interface{}
   637  	for rows.Next() {
   638  		row, err := rows.Values()
   639  		if err != nil {
   640  			t.Errorf("Unexpected error for rows.Values(): %v", err)
   641  		}
   642  		outputRows = append(outputRows, row)
   643  	}
   644  
   645  	assert.NoError(t, rows.Err())
   646  	assert.Equal(t, inputRows, outputRows)
   647  }
   648  
   649  func TestConnReleaseClosesConnInFailedTransaction(t *testing.T) {
   650  	t.Parallel()
   651  
   652  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   653  	defer cancel()
   654  	pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
   655  	require.NoError(t, err)
   656  	defer pool.Close()
   657  
   658  	err = pool.AcquireFunc(ctx, func(conn *pgxpool.Conn) error {
   659  		if conn.Conn().PgConn().ParameterStatus("crdb_version") != "" {
   660  			t.Skip("Server does not support backend PID")
   661  		}
   662  		return nil
   663  	})
   664  	require.NoError(t, err)
   665  
   666  	c, err := pool.Acquire(ctx)
   667  	require.NoError(t, err)
   668  
   669  	pid := c.Conn().PgConn().PID()
   670  
   671  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   672  
   673  	_, err = c.Exec(ctx, "begin")
   674  	assert.NoError(t, err)
   675  
   676  	assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
   677  
   678  	_, err = c.Exec(ctx, "selct")
   679  	assert.Error(t, err)
   680  
   681  	assert.Equal(t, byte('E'), c.Conn().PgConn().TxStatus())
   682  
   683  	c.Release()
   684  	waitForReleaseToComplete()
   685  
   686  	c, err = pool.Acquire(ctx)
   687  	require.NoError(t, err)
   688  
   689  	assert.NotEqual(t, pid, c.Conn().PgConn().PID())
   690  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   691  
   692  	c.Release()
   693  }
   694  
   695  func TestConnReleaseClosesConnInTransaction(t *testing.T) {
   696  	t.Parallel()
   697  
   698  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   699  	defer cancel()
   700  	pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
   701  	require.NoError(t, err)
   702  	defer pool.Close()
   703  
   704  	err = pool.AcquireFunc(ctx, func(conn *pgxpool.Conn) error {
   705  		if conn.Conn().PgConn().ParameterStatus("crdb_version") != "" {
   706  			t.Skip("Server does not support backend PID")
   707  		}
   708  		return nil
   709  	})
   710  	require.NoError(t, err)
   711  
   712  	c, err := pool.Acquire(ctx)
   713  	require.NoError(t, err)
   714  
   715  	pid := c.Conn().PgConn().PID()
   716  
   717  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   718  
   719  	_, err = c.Exec(ctx, "begin")
   720  	assert.NoError(t, err)
   721  
   722  	assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
   723  
   724  	c.Release()
   725  	waitForReleaseToComplete()
   726  
   727  	c, err = pool.Acquire(ctx)
   728  	require.NoError(t, err)
   729  
   730  	assert.NotEqual(t, pid, c.Conn().PgConn().PID())
   731  	assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
   732  
   733  	c.Release()
   734  }
   735  
   736  func TestConnReleaseDestroysClosedConn(t *testing.T) {
   737  	t.Parallel()
   738  
   739  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   740  	defer cancel()
   741  	pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
   742  	require.NoError(t, err)
   743  	defer pool.Close()
   744  
   745  	c, err := pool.Acquire(ctx)
   746  	require.NoError(t, err)
   747  
   748  	err = c.Conn().Close(ctx)
   749  	require.NoError(t, err)
   750  
   751  	assert.EqualValues(t, 1, pool.Stat().TotalConns())
   752  
   753  	c.Release()
   754  	waitForReleaseToComplete()
   755  
   756  	// wait for the connection to actually be destroyed
   757  	for i := 0; i < 1000; i++ {
   758  		if pool.Stat().TotalConns() == 0 {
   759  			break
   760  		}
   761  		time.Sleep(time.Millisecond)
   762  	}
   763  
   764  	assert.EqualValues(t, 0, pool.Stat().TotalConns())
   765  }
   766  
   767  func TestConnPoolQueryConcurrentLoad(t *testing.T) {
   768  	t.Parallel()
   769  
   770  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   771  	require.NoError(t, err)
   772  	defer pool.Close()
   773  
   774  	n := 100
   775  	done := make(chan bool)
   776  
   777  	for i := 0; i < n; i++ {
   778  		go func() {
   779  			defer func() { done <- true }()
   780  			testQuery(t, pool)
   781  			testQueryRow(t, pool)
   782  		}()
   783  	}
   784  
   785  	for i := 0; i < n; i++ {
   786  		<-done
   787  	}
   788  }
   789  
   790  func TestConnReleaseWhenBeginFail(t *testing.T) {
   791  	t.Parallel()
   792  
   793  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   794  	defer cancel()
   795  
   796  	db, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
   797  	require.NoError(t, err)
   798  	defer db.Close()
   799  
   800  	tx, err := db.BeginTx(ctx, pgx.TxOptions{
   801  		IsoLevel: pgx.TxIsoLevel("foo"),
   802  	})
   803  	assert.Error(t, err)
   804  	if !assert.Zero(t, tx) {
   805  		err := tx.Rollback(ctx)
   806  		assert.NoError(t, err)
   807  	}
   808  
   809  	for i := 0; i < 1000; i++ {
   810  		if db.Stat().TotalConns() == 0 {
   811  			break
   812  		}
   813  		time.Sleep(time.Millisecond)
   814  	}
   815  
   816  	assert.EqualValues(t, 0, db.Stat().TotalConns())
   817  }
   818  
   819  func TestTxBeginFuncNestedTransactionCommit(t *testing.T) {
   820  	db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   821  	require.NoError(t, err)
   822  	defer db.Close()
   823  
   824  	createSql := `
   825  		drop table if exists pgxpooltx;
   826      create temporary table pgxpooltx(
   827        id integer,
   828        unique (id)
   829      );
   830    `
   831  
   832  	_, err = db.Exec(context.Background(), createSql)
   833  	require.NoError(t, err)
   834  
   835  	defer func() {
   836  		db.Exec(context.Background(), "drop table pgxpooltx")
   837  	}()
   838  
   839  	err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
   840  		_, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (1)")
   841  		require.NoError(t, err)
   842  
   843  		err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
   844  			_, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (2)")
   845  			require.NoError(t, err)
   846  
   847  			err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
   848  				_, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (3)")
   849  				require.NoError(t, err)
   850  				return nil
   851  			})
   852  			require.NoError(t, err)
   853  			return nil
   854  		})
   855  		require.NoError(t, err)
   856  		return nil
   857  	})
   858  	require.NoError(t, err)
   859  
   860  	var n int64
   861  	err = db.QueryRow(context.Background(), "select count(*) from pgxpooltx").Scan(&n)
   862  	require.NoError(t, err)
   863  	require.EqualValues(t, 3, n)
   864  }
   865  
   866  func TestTxBeginFuncNestedTransactionRollback(t *testing.T) {
   867  	db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   868  	require.NoError(t, err)
   869  	defer db.Close()
   870  
   871  	createSql := `
   872  		drop table if exists pgxpooltx;
   873      create temporary table pgxpooltx(
   874        id integer,
   875        unique (id)
   876      );
   877    `
   878  
   879  	_, err = db.Exec(context.Background(), createSql)
   880  	require.NoError(t, err)
   881  
   882  	defer func() {
   883  		db.Exec(context.Background(), "drop table pgxpooltx")
   884  	}()
   885  
   886  	err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
   887  		_, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (1)")
   888  		require.NoError(t, err)
   889  
   890  		err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
   891  			_, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (2)")
   892  			require.NoError(t, err)
   893  			return errors.New("do a rollback")
   894  		})
   895  		require.EqualError(t, err, "do a rollback")
   896  
   897  		_, err = db.Exec(context.Background(), "insert into pgxpooltx(id) values (3)")
   898  		require.NoError(t, err)
   899  
   900  		return nil
   901  	})
   902  	require.NoError(t, err)
   903  
   904  	var n int64
   905  	err = db.QueryRow(context.Background(), "select count(*) from pgxpooltx").Scan(&n)
   906  	require.NoError(t, err)
   907  	require.EqualValues(t, 2, n)
   908  }
   909  
   910  func TestIdempotentPoolClose(t *testing.T) {
   911  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
   912  	require.NoError(t, err)
   913  
   914  	// Close the open pool.
   915  	require.NotPanics(t, func() { pool.Close() })
   916  
   917  	// Close the already closed pool.
   918  	require.NotPanics(t, func() { pool.Close() })
   919  }
   920  
   921  func TestConnectCreatesMinPool(t *testing.T) {
   922  	t.Parallel()
   923  
   924  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   925  	require.NoError(t, err)
   926  
   927  	config.MinConns = int32(12)
   928  	config.MaxConns = int32(15)
   929  	config.LazyConnect = false
   930  
   931  	acquireAttempts := int64(0)
   932  	connectAttempts := int64(0)
   933  
   934  	config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
   935  		atomic.AddInt64(&acquireAttempts, 1)
   936  		return true
   937  	}
   938  	config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
   939  		atomic.AddInt64(&connectAttempts, 1)
   940  		return nil
   941  	}
   942  
   943  	pool, err := pgxpool.ConnectConfig(context.Background(), config)
   944  	require.NoError(t, err)
   945  	defer pool.Close()
   946  
   947  	stat := pool.Stat()
   948  	require.Equal(t, int32(12), stat.IdleConns())
   949  	require.Equal(t, int64(1), stat.AcquireCount())
   950  	require.Equal(t, int32(12), stat.TotalConns())
   951  	require.Equal(t, int64(0), acquireAttempts)
   952  	require.Equal(t, int64(12), connectAttempts)
   953  }
   954  func TestConnectSkipMinPoolWithLazy(t *testing.T) {
   955  	t.Parallel()
   956  
   957  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   958  	require.NoError(t, err)
   959  
   960  	config.MinConns = int32(12)
   961  	config.MaxConns = int32(15)
   962  	config.LazyConnect = true
   963  
   964  	acquireAttempts := int64(0)
   965  	connectAttempts := int64(0)
   966  
   967  	config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
   968  		atomic.AddInt64(&acquireAttempts, 1)
   969  		return true
   970  	}
   971  	config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
   972  		atomic.AddInt64(&connectAttempts, 1)
   973  		return nil
   974  	}
   975  
   976  	pool, err := pgxpool.ConnectConfig(context.Background(), config)
   977  	require.NoError(t, err)
   978  	defer pool.Close()
   979  
   980  	stat := pool.Stat()
   981  	require.Equal(t, int32(0), stat.IdleConns())
   982  	require.Equal(t, int64(0), stat.AcquireCount())
   983  	require.Equal(t, int32(0), stat.TotalConns())
   984  	require.Equal(t, int64(0), acquireAttempts)
   985  	require.Equal(t, int64(0), connectAttempts)
   986  }
   987  
   988  func TestConnectMinPoolZero(t *testing.T) {
   989  	t.Parallel()
   990  
   991  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
   992  	require.NoError(t, err)
   993  
   994  	config.MinConns = int32(0)
   995  	config.MaxConns = int32(15)
   996  	config.LazyConnect = false
   997  
   998  	acquireAttempts := int64(0)
   999  	connectAttempts := int64(0)
  1000  
  1001  	config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
  1002  		atomic.AddInt64(&acquireAttempts, 1)
  1003  		return true
  1004  	}
  1005  	config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
  1006  		atomic.AddInt64(&connectAttempts, 1)
  1007  		return nil
  1008  	}
  1009  
  1010  	pool, err := pgxpool.ConnectConfig(context.Background(), config)
  1011  	require.NoError(t, err)
  1012  	defer pool.Close()
  1013  
  1014  	stat := pool.Stat()
  1015  	require.Equal(t, int32(1), stat.IdleConns())
  1016  	require.Equal(t, int64(1), stat.AcquireCount())
  1017  	require.Equal(t, int32(1), stat.TotalConns())
  1018  	require.Equal(t, int64(0), acquireAttempts)
  1019  	require.Equal(t, int64(1), connectAttempts)
  1020  }
  1021  
  1022  func TestCreateMinPoolClosesConnectionsOnError(t *testing.T) {
  1023  	t.Parallel()
  1024  
  1025  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
  1026  	require.NoError(t, err)
  1027  
  1028  	config.MinConns = int32(12)
  1029  	config.MaxConns = int32(15)
  1030  	config.LazyConnect = false
  1031  
  1032  	acquireAttempts := int64(0)
  1033  	madeConnections := int64(0)
  1034  	conns := make(chan *pgx.Conn, 15)
  1035  
  1036  	config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
  1037  		atomic.AddInt64(&acquireAttempts, 1)
  1038  		return true
  1039  	}
  1040  	config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
  1041  		conns <- conn
  1042  
  1043  		atomic.AddInt64(&madeConnections, 1)
  1044  		mc := atomic.LoadInt64(&madeConnections)
  1045  		if mc == 10 {
  1046  			return errors.New("mock error")
  1047  		}
  1048  		return nil
  1049  	}
  1050  	pool, err := pgxpool.ConnectConfig(context.Background(), config)
  1051  	require.Error(t, err)
  1052  	require.Nil(t, pool)
  1053  
  1054  	close(conns)
  1055  	for conn := range conns {
  1056  		require.True(t, conn.IsClosed())
  1057  	}
  1058  
  1059  	require.Equal(t, int64(0), acquireAttempts)
  1060  	require.True(t, madeConnections >= 10, "Expected %d got %d", 10, madeConnections)
  1061  }
  1062  
  1063  func TestCreateMinPoolReturnsFirstError(t *testing.T) {
  1064  	t.Parallel()
  1065  
  1066  	config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
  1067  	require.NoError(t, err)
  1068  
  1069  	config.MinConns = int32(12)
  1070  	config.MaxConns = int32(15)
  1071  	config.LazyConnect = false
  1072  
  1073  	acquireAttempts := int64(0)
  1074  	connectAttempts := int64(0)
  1075  
  1076  	mockErr := errors.New("mock connect error")
  1077  
  1078  	config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
  1079  		atomic.AddInt64(&acquireAttempts, 1)
  1080  		return true
  1081  	}
  1082  	config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
  1083  		atomic.AddInt64(&connectAttempts, 1)
  1084  		ca := atomic.LoadInt64(&connectAttempts)
  1085  		if ca >= 5 {
  1086  			return mockErr
  1087  		}
  1088  		return nil
  1089  	}
  1090  
  1091  	pool, err := pgxpool.ConnectConfig(context.Background(), config)
  1092  	require.Nil(t, pool)
  1093  	require.Error(t, err)
  1094  
  1095  	require.True(t, connectAttempts >= 5, "Expected %d got %d", 5, connectAttempts)
  1096  	require.ErrorIs(t, err, mockErr)
  1097  }
  1098  
  1099  func TestPoolSendBatchBatchCloseTwice(t *testing.T) {
  1100  	t.Parallel()
  1101  
  1102  	pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
  1103  	require.NoError(t, err)
  1104  	defer pool.Close()
  1105  
  1106  	errChan := make(chan error)
  1107  	testCount := 5000
  1108  
  1109  	for i := 0; i < testCount; i++ {
  1110  		go func() {
  1111  			batch := &pgx.Batch{}
  1112  			batch.Queue("select 1")
  1113  			batch.Queue("select 2")
  1114  
  1115  			br := pool.SendBatch(context.Background(), batch)
  1116  			defer br.Close()
  1117  
  1118  			var err error
  1119  			var n int32
  1120  			err = br.QueryRow().Scan(&n)
  1121  			if err != nil {
  1122  				errChan <- err
  1123  				return
  1124  			}
  1125  			if n != 1 {
  1126  				errChan <- fmt.Errorf("expected 1 got %v", n)
  1127  				return
  1128  			}
  1129  
  1130  			err = br.QueryRow().Scan(&n)
  1131  			if err != nil {
  1132  				errChan <- err
  1133  				return
  1134  			}
  1135  			if n != 2 {
  1136  				errChan <- fmt.Errorf("expected 2 got %v", n)
  1137  				return
  1138  			}
  1139  
  1140  			err = br.Close()
  1141  			errChan <- err
  1142  		}()
  1143  	}
  1144  
  1145  	for i := 0; i < testCount; i++ {
  1146  		err := <-errChan
  1147  		assert.NoError(t, err)
  1148  	}
  1149  }
  1150  

View as plain text