...

Source file src/github.com/jackc/pgx/v5/conn_test.go

Documentation: github.com/jackc/pgx/v5

     1  package pgx_test
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"os"
     7  	"strings"
     8  	"sync"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/jackc/pgx/v5"
    13  	"github.com/jackc/pgx/v5/pgconn"
    14  	"github.com/jackc/pgx/v5/pgtype"
    15  	"github.com/jackc/pgx/v5/pgxtest"
    16  	"github.com/stretchr/testify/assert"
    17  	"github.com/stretchr/testify/require"
    18  )
    19  
    20  func TestCrateDBConnect(t *testing.T) {
    21  	t.Parallel()
    22  
    23  	connString := os.Getenv("PGX_TEST_CRATEDB_CONN_STRING")
    24  	if connString == "" {
    25  		t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_CRATEDB_CONN_STRING")
    26  	}
    27  
    28  	conn, err := pgx.Connect(context.Background(), connString)
    29  	require.Nil(t, err)
    30  	defer closeConn(t, conn)
    31  
    32  	assert.Equal(t, connString, conn.Config().ConnString())
    33  
    34  	var result int
    35  	err = conn.QueryRow(context.Background(), "select 1 +1").Scan(&result)
    36  	if err != nil {
    37  		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
    38  	}
    39  	if result != 2 {
    40  		t.Errorf("bad result: %d", result)
    41  	}
    42  }
    43  
    44  func TestConnect(t *testing.T) {
    45  	t.Parallel()
    46  
    47  	connString := os.Getenv("PGX_TEST_DATABASE")
    48  	config := mustParseConfig(t, connString)
    49  
    50  	conn, err := pgx.ConnectConfig(context.Background(), config)
    51  	if err != nil {
    52  		t.Fatalf("Unable to establish connection: %v", err)
    53  	}
    54  
    55  	assertConfigsEqual(t, config, conn.Config(), "Conn.Config() returns original config")
    56  
    57  	var currentDB string
    58  	err = conn.QueryRow(context.Background(), "select current_database()").Scan(&currentDB)
    59  	if err != nil {
    60  		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
    61  	}
    62  	if currentDB != config.Config.Database {
    63  		t.Errorf("Did not connect to specified database (%v)", config.Config.Database)
    64  	}
    65  
    66  	var user string
    67  	err = conn.QueryRow(context.Background(), "select current_user").Scan(&user)
    68  	if err != nil {
    69  		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
    70  	}
    71  	if user != config.Config.User {
    72  		t.Errorf("Did not connect as specified user (%v)", config.Config.User)
    73  	}
    74  
    75  	err = conn.Close(context.Background())
    76  	if err != nil {
    77  		t.Fatal("Unable to close connection")
    78  	}
    79  }
    80  
    81  func TestConnectWithPreferSimpleProtocol(t *testing.T) {
    82  	t.Parallel()
    83  
    84  	connConfig := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
    85  	connConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
    86  
    87  	conn := mustConnect(t, connConfig)
    88  	defer closeConn(t, conn)
    89  
    90  	// If simple protocol is used we should be able to correctly scan the result
    91  	// into a pgtype.Text as the integer will have been encoded in text.
    92  
    93  	var s pgtype.Text
    94  	err := conn.QueryRow(context.Background(), "select $1::int4", 42).Scan(&s)
    95  	require.NoError(t, err)
    96  	require.Equal(t, pgtype.Text{String: "42", Valid: true}, s)
    97  
    98  	ensureConnValid(t, conn)
    99  }
   100  
   101  func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
   102  	config := &pgx.ConnConfig{}
   103  	require.PanicsWithValue(t, "config must be created by ParseConfig", func() {
   104  		pgx.ConnectConfig(context.Background(), config)
   105  	})
   106  }
   107  
   108  func TestConfigContainsConnStr(t *testing.T) {
   109  	connStr := os.Getenv("PGX_TEST_DATABASE")
   110  	config, err := pgx.ParseConfig(connStr)
   111  	require.NoError(t, err)
   112  	assert.Equal(t, connStr, config.ConnString())
   113  }
   114  
   115  func TestConfigCopyReturnsEqualConfig(t *testing.T) {
   116  	connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
   117  	original, err := pgx.ParseConfig(connString)
   118  	require.NoError(t, err)
   119  
   120  	copied := original.Copy()
   121  	assertConfigsEqual(t, original, copied, t.Name())
   122  }
   123  
   124  func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
   125  	connString := os.Getenv("PGX_TEST_DATABASE")
   126  	original, err := pgx.ParseConfig(connString)
   127  	require.NoError(t, err)
   128  
   129  	copied := original.Copy()
   130  	assert.NotPanics(t, func() {
   131  		_, err = pgx.ConnectConfig(context.Background(), copied)
   132  	})
   133  	assert.NoError(t, err)
   134  }
   135  
   136  func TestParseConfigExtractsStatementCacheOptions(t *testing.T) {
   137  	t.Parallel()
   138  
   139  	config, err := pgx.ParseConfig("statement_cache_capacity=0")
   140  	require.NoError(t, err)
   141  	require.EqualValues(t, 0, config.StatementCacheCapacity)
   142  
   143  	config, err = pgx.ParseConfig("statement_cache_capacity=42")
   144  	require.NoError(t, err)
   145  	require.EqualValues(t, 42, config.StatementCacheCapacity)
   146  
   147  	config, err = pgx.ParseConfig("description_cache_capacity=0")
   148  	require.NoError(t, err)
   149  	require.EqualValues(t, 0, config.DescriptionCacheCapacity)
   150  
   151  	config, err = pgx.ParseConfig("description_cache_capacity=42")
   152  	require.NoError(t, err)
   153  	require.EqualValues(t, 42, config.DescriptionCacheCapacity)
   154  
   155  	//	default_query_exec_mode
   156  	//		Possible values: "cache_statement", "cache_describe", "describe_exec", "exec", and "simple_protocol". See
   157  
   158  	config, err = pgx.ParseConfig("default_query_exec_mode=cache_statement")
   159  	require.NoError(t, err)
   160  	require.Equal(t, pgx.QueryExecModeCacheStatement, config.DefaultQueryExecMode)
   161  
   162  	config, err = pgx.ParseConfig("default_query_exec_mode=cache_describe")
   163  	require.NoError(t, err)
   164  	require.Equal(t, pgx.QueryExecModeCacheDescribe, config.DefaultQueryExecMode)
   165  
   166  	config, err = pgx.ParseConfig("default_query_exec_mode=describe_exec")
   167  	require.NoError(t, err)
   168  	require.Equal(t, pgx.QueryExecModeDescribeExec, config.DefaultQueryExecMode)
   169  
   170  	config, err = pgx.ParseConfig("default_query_exec_mode=exec")
   171  	require.NoError(t, err)
   172  	require.Equal(t, pgx.QueryExecModeExec, config.DefaultQueryExecMode)
   173  
   174  	config, err = pgx.ParseConfig("default_query_exec_mode=simple_protocol")
   175  	require.NoError(t, err)
   176  	require.Equal(t, pgx.QueryExecModeSimpleProtocol, config.DefaultQueryExecMode)
   177  }
   178  
   179  func TestParseConfigExtractsDefaultQueryExecMode(t *testing.T) {
   180  	t.Parallel()
   181  
   182  	for _, tt := range []struct {
   183  		connString           string
   184  		defaultQueryExecMode pgx.QueryExecMode
   185  	}{
   186  		{"", pgx.QueryExecModeCacheStatement},
   187  		{"default_query_exec_mode=cache_statement", pgx.QueryExecModeCacheStatement},
   188  		{"default_query_exec_mode=cache_describe", pgx.QueryExecModeCacheDescribe},
   189  		{"default_query_exec_mode=describe_exec", pgx.QueryExecModeDescribeExec},
   190  		{"default_query_exec_mode=exec", pgx.QueryExecModeExec},
   191  		{"default_query_exec_mode=simple_protocol", pgx.QueryExecModeSimpleProtocol},
   192  	} {
   193  		config, err := pgx.ParseConfig(tt.connString)
   194  		require.NoError(t, err)
   195  		require.Equalf(t, tt.defaultQueryExecMode, config.DefaultQueryExecMode, "connString: `%s`", tt.connString)
   196  		require.Empty(t, config.RuntimeParams["default_query_exec_mode"])
   197  	}
   198  }
   199  
   200  func TestParseConfigErrors(t *testing.T) {
   201  	t.Parallel()
   202  
   203  	for _, tt := range []struct {
   204  		connString           string
   205  		expectedErrSubstring string
   206  	}{
   207  		{"default_query_exec_mode=does_not_exist", "does_not_exist"},
   208  	} {
   209  		config, err := pgx.ParseConfig(tt.connString)
   210  		require.Nil(t, config)
   211  		require.ErrorContains(t, err, tt.expectedErrSubstring)
   212  	}
   213  }
   214  
   215  func TestExec(t *testing.T) {
   216  	t.Parallel()
   217  
   218  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   219  	defer cancel()
   220  
   221  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   222  		if results := mustExec(t, conn, "create temporary table foo(id integer primary key);"); results.String() != "CREATE TABLE" {
   223  			t.Error("Unexpected results from Exec")
   224  		}
   225  
   226  		// Accept parameters
   227  		if results := mustExec(t, conn, "insert into foo(id) values($1)", 1); results.String() != "INSERT 0 1" {
   228  			t.Errorf("Unexpected results from Exec: %v", results)
   229  		}
   230  
   231  		if results := mustExec(t, conn, "drop table foo;"); results.String() != "DROP TABLE" {
   232  			t.Error("Unexpected results from Exec")
   233  		}
   234  
   235  		// Multiple statements can be executed -- last command tag is returned
   236  		if results := mustExec(t, conn, "create temporary table foo(id serial primary key); drop table foo;"); results.String() != "DROP TABLE" {
   237  			t.Error("Unexpected results from Exec")
   238  		}
   239  
   240  		// Can execute longer SQL strings than sharedBufferSize
   241  		if results := mustExec(t, conn, strings.Repeat("select 42; ", 1000)); results.String() != "SELECT 1" {
   242  			t.Errorf("Unexpected results from Exec: %v", results)
   243  		}
   244  
   245  		// Exec no-op which does not return a command tag
   246  		if results := mustExec(t, conn, "--;"); results.String() != "" {
   247  			t.Errorf("Unexpected results from Exec: %v", results)
   248  		}
   249  	})
   250  }
   251  
   252  type testQueryRewriter struct {
   253  	sql  string
   254  	args []any
   255  }
   256  
   257  func (qr *testQueryRewriter) RewriteQuery(ctx context.Context, conn *pgx.Conn, sql string, args []any) (newSQL string, newArgs []any, err error) {
   258  	return qr.sql, qr.args, nil
   259  }
   260  
   261  func TestExecWithQueryRewriter(t *testing.T) {
   262  	t.Parallel()
   263  
   264  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   265  	defer cancel()
   266  
   267  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   268  		qr := testQueryRewriter{sql: "select $1::int", args: []any{42}}
   269  		_, err := conn.Exec(ctx, "should be replaced", &qr)
   270  		require.NoError(t, err)
   271  	})
   272  }
   273  
   274  func TestExecFailure(t *testing.T) {
   275  	t.Parallel()
   276  
   277  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   278  	defer cancel()
   279  
   280  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   281  		if _, err := conn.Exec(context.Background(), "selct;"); err == nil {
   282  			t.Fatal("Expected SQL syntax error")
   283  		}
   284  
   285  		rows, _ := conn.Query(context.Background(), "select 1")
   286  		rows.Close()
   287  		if rows.Err() != nil {
   288  			t.Fatalf("Exec failure appears to have broken connection: %v", rows.Err())
   289  		}
   290  	})
   291  }
   292  
   293  func TestExecFailureWithArguments(t *testing.T) {
   294  	t.Parallel()
   295  
   296  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   297  	defer cancel()
   298  
   299  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   300  		_, err := conn.Exec(context.Background(), "selct $1;", 1)
   301  		if err == nil {
   302  			t.Fatal("Expected SQL syntax error")
   303  		}
   304  		assert.False(t, pgconn.SafeToRetry(err))
   305  
   306  		_, err = conn.Exec(context.Background(), "select $1::varchar(1);", "1", "2")
   307  		require.Error(t, err)
   308  	})
   309  }
   310  
   311  func TestExecContextWithoutCancelation(t *testing.T) {
   312  	t.Parallel()
   313  
   314  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   315  	defer cancel()
   316  
   317  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   318  		ctx, cancelFunc := context.WithCancel(ctx)
   319  		defer cancelFunc()
   320  
   321  		commandTag, err := conn.Exec(ctx, "create temporary table foo(id integer primary key);")
   322  		if err != nil {
   323  			t.Fatal(err)
   324  		}
   325  		if commandTag.String() != "CREATE TABLE" {
   326  			t.Fatalf("Unexpected results from Exec: %v", commandTag)
   327  		}
   328  		assert.False(t, pgconn.SafeToRetry(err))
   329  	})
   330  }
   331  
   332  func TestExecContextFailureWithoutCancelation(t *testing.T) {
   333  	t.Parallel()
   334  
   335  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   336  	defer cancel()
   337  
   338  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   339  		ctx, cancelFunc := context.WithCancel(ctx)
   340  		defer cancelFunc()
   341  
   342  		_, err := conn.Exec(ctx, "selct;")
   343  		if err == nil {
   344  			t.Fatal("Expected SQL syntax error")
   345  		}
   346  		assert.False(t, pgconn.SafeToRetry(err))
   347  
   348  		rows, _ := conn.Query(context.Background(), "select 1")
   349  		rows.Close()
   350  		if rows.Err() != nil {
   351  			t.Fatalf("ExecEx failure appears to have broken connection: %v", rows.Err())
   352  		}
   353  		assert.False(t, pgconn.SafeToRetry(err))
   354  	})
   355  }
   356  
   357  func TestExecContextFailureWithoutCancelationWithArguments(t *testing.T) {
   358  	t.Parallel()
   359  
   360  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   361  	defer cancel()
   362  
   363  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   364  		ctx, cancelFunc := context.WithCancel(ctx)
   365  		defer cancelFunc()
   366  
   367  		_, err := conn.Exec(ctx, "selct $1;", 1)
   368  		if err == nil {
   369  			t.Fatal("Expected SQL syntax error")
   370  		}
   371  		assert.False(t, pgconn.SafeToRetry(err))
   372  	})
   373  }
   374  
   375  func TestExecFailureCloseBefore(t *testing.T) {
   376  	t.Parallel()
   377  
   378  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   379  	closeConn(t, conn)
   380  
   381  	_, err := conn.Exec(context.Background(), "select 1")
   382  	require.Error(t, err)
   383  	assert.True(t, pgconn.SafeToRetry(err))
   384  }
   385  
   386  func TestExecPerQuerySimpleProtocol(t *testing.T) {
   387  	t.Parallel()
   388  
   389  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   390  	defer closeConn(t, conn)
   391  
   392  	ctx, cancelFunc := context.WithCancel(context.Background())
   393  	defer cancelFunc()
   394  
   395  	commandTag, err := conn.Exec(ctx, "create temporary table foo(name varchar primary key);")
   396  	if err != nil {
   397  		t.Fatal(err)
   398  	}
   399  	if commandTag.String() != "CREATE TABLE" {
   400  		t.Fatalf("Unexpected results from Exec: %v", commandTag)
   401  	}
   402  
   403  	commandTag, err = conn.Exec(ctx,
   404  		"insert into foo(name) values($1);",
   405  		pgx.QueryExecModeSimpleProtocol,
   406  		"bar'; drop table foo;--",
   407  	)
   408  	if err != nil {
   409  		t.Fatal(err)
   410  	}
   411  	if commandTag.String() != "INSERT 0 1" {
   412  		t.Fatalf("Unexpected results from Exec: %v", commandTag)
   413  	}
   414  
   415  }
   416  
   417  func TestPrepare(t *testing.T) {
   418  	t.Parallel()
   419  
   420  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   421  	defer closeConn(t, conn)
   422  
   423  	_, err := conn.Prepare(context.Background(), "test", "select $1::varchar")
   424  	if err != nil {
   425  		t.Errorf("Unable to prepare statement: %v", err)
   426  		return
   427  	}
   428  
   429  	var s string
   430  	err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
   431  	if err != nil {
   432  		t.Errorf("Executing prepared statement failed: %v", err)
   433  	}
   434  
   435  	if s != "hello" {
   436  		t.Errorf("Prepared statement did not return expected value: %v", s)
   437  	}
   438  
   439  	err = conn.Deallocate(context.Background(), "test")
   440  	if err != nil {
   441  		t.Errorf("conn.Deallocate failed: %v", err)
   442  	}
   443  
   444  	// Create another prepared statement to ensure Deallocate left the connection
   445  	// in a working state and that we can reuse the prepared statement name.
   446  
   447  	_, err = conn.Prepare(context.Background(), "test", "select $1::integer")
   448  	if err != nil {
   449  		t.Errorf("Unable to prepare statement: %v", err)
   450  		return
   451  	}
   452  
   453  	var n int32
   454  	err = conn.QueryRow(context.Background(), "test", int32(1)).Scan(&n)
   455  	if err != nil {
   456  		t.Errorf("Executing prepared statement failed: %v", err)
   457  	}
   458  
   459  	if n != 1 {
   460  		t.Errorf("Prepared statement did not return expected value: %v", s)
   461  	}
   462  
   463  	err = conn.DeallocateAll(context.Background())
   464  	if err != nil {
   465  		t.Errorf("conn.Deallocate failed: %v", err)
   466  	}
   467  }
   468  
   469  func TestPrepareBadSQLFailure(t *testing.T) {
   470  	t.Parallel()
   471  
   472  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   473  	defer closeConn(t, conn)
   474  
   475  	if _, err := conn.Prepare(context.Background(), "badSQL", "select foo"); err == nil {
   476  		t.Fatal("Prepare should have failed with syntax error")
   477  	}
   478  
   479  	ensureConnValid(t, conn)
   480  }
   481  
   482  func TestPrepareIdempotency(t *testing.T) {
   483  	t.Parallel()
   484  
   485  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   486  	defer cancel()
   487  
   488  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   489  		for i := 0; i < 2; i++ {
   490  			_, err := conn.Prepare(context.Background(), "test", "select 42::integer")
   491  			if err != nil {
   492  				t.Fatalf("%d. Unable to prepare statement: %v", i, err)
   493  			}
   494  
   495  			var n int32
   496  			err = conn.QueryRow(context.Background(), "test").Scan(&n)
   497  			if err != nil {
   498  				t.Errorf("%d. Executing prepared statement failed: %v", i, err)
   499  			}
   500  
   501  			if n != int32(42) {
   502  				t.Errorf("%d. Prepared statement did not return expected value: %v", i, n)
   503  			}
   504  		}
   505  
   506  		_, err := conn.Prepare(context.Background(), "test", "select 'fail'::varchar")
   507  		if err == nil {
   508  			t.Fatalf("Prepare statement with same name but different SQL should have failed but it didn't")
   509  			return
   510  		}
   511  	})
   512  }
   513  
   514  func TestPrepareStatementCacheModes(t *testing.T) {
   515  	t.Parallel()
   516  
   517  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   518  	defer cancel()
   519  
   520  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   521  		_, err := conn.Prepare(context.Background(), "test", "select $1::text")
   522  		require.NoError(t, err)
   523  
   524  		var s string
   525  		err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
   526  		require.NoError(t, err)
   527  		require.Equal(t, "hello", s)
   528  	})
   529  }
   530  
   531  func TestPrepareWithDigestedName(t *testing.T) {
   532  	t.Parallel()
   533  
   534  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   535  	defer cancel()
   536  
   537  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   538  		sql := "select $1::text"
   539  		sd, err := conn.Prepare(ctx, sql, sql)
   540  		require.NoError(t, err)
   541  		require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)
   542  
   543  		var s string
   544  		err = conn.QueryRow(ctx, sql, "hello").Scan(&s)
   545  		require.NoError(t, err)
   546  		require.Equal(t, "hello", s)
   547  
   548  		err = conn.Deallocate(ctx, sql)
   549  		require.NoError(t, err)
   550  	})
   551  }
   552  
   553  // https://github.com/jackc/pgx/pull/1795
   554  func TestDeallocateInAbortedTransaction(t *testing.T) {
   555  	t.Parallel()
   556  
   557  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   558  	defer cancel()
   559  
   560  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   561  		tx, err := conn.Begin(ctx)
   562  		require.NoError(t, err)
   563  
   564  		sql := "select $1::text"
   565  		sd, err := tx.Prepare(ctx, sql, sql)
   566  		require.NoError(t, err)
   567  		require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)
   568  
   569  		var s string
   570  		err = tx.QueryRow(ctx, sql, "hello").Scan(&s)
   571  		require.NoError(t, err)
   572  		require.Equal(t, "hello", s)
   573  
   574  		_, err = tx.Exec(ctx, "select 1/0") // abort transaction with divide by zero error
   575  		require.Error(t, err)
   576  
   577  		err = conn.Deallocate(ctx, sql)
   578  		require.NoError(t, err)
   579  
   580  		err = tx.Rollback(ctx)
   581  		require.NoError(t, err)
   582  
   583  		sd, err = conn.Prepare(ctx, sql, sql)
   584  		require.NoError(t, err)
   585  		require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)
   586  	})
   587  }
   588  
   589  func TestDeallocateMissingPreparedStatementStillClearsFromPreparedStatementMap(t *testing.T) {
   590  	t.Parallel()
   591  
   592  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   593  	defer cancel()
   594  
   595  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   596  		_, err := conn.Prepare(ctx, "ps", "select $1::text")
   597  		require.NoError(t, err)
   598  
   599  		_, err = conn.Exec(ctx, "deallocate ps")
   600  		require.NoError(t, err)
   601  
   602  		err = conn.Deallocate(ctx, "ps")
   603  		require.NoError(t, err)
   604  
   605  		_, err = conn.Prepare(ctx, "ps", "select $1::text, $2::text")
   606  		require.NoError(t, err)
   607  
   608  		var s1, s2 string
   609  		err = conn.QueryRow(ctx, "ps", "hello", "world").Scan(&s1, &s2)
   610  		require.NoError(t, err)
   611  		require.Equal(t, "hello", s1)
   612  		require.Equal(t, "world", s2)
   613  	})
   614  }
   615  
   616  func TestListenNotify(t *testing.T) {
   617  	t.Parallel()
   618  
   619  	listener := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   620  	defer closeConn(t, listener)
   621  
   622  	if listener.PgConn().ParameterStatus("crdb_version") != "" {
   623  		t.Skip("Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
   624  	}
   625  
   626  	mustExec(t, listener, "listen chat")
   627  
   628  	notifier := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   629  	defer closeConn(t, notifier)
   630  
   631  	mustExec(t, notifier, "notify chat")
   632  
   633  	// when notification is waiting on the socket to be read
   634  	notification, err := listener.WaitForNotification(context.Background())
   635  	require.NoError(t, err)
   636  	assert.Equal(t, "chat", notification.Channel)
   637  
   638  	// when notification has already been read during previous query
   639  	mustExec(t, notifier, "notify chat")
   640  	rows, _ := listener.Query(context.Background(), "select 1")
   641  	rows.Close()
   642  	require.NoError(t, rows.Err())
   643  
   644  	ctx, cancelFn := context.WithCancel(context.Background())
   645  	cancelFn()
   646  	notification, err = listener.WaitForNotification(ctx)
   647  	require.NoError(t, err)
   648  	assert.Equal(t, "chat", notification.Channel)
   649  
   650  	// when timeout occurs
   651  	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
   652  	defer cancel()
   653  	notification, err = listener.WaitForNotification(ctx)
   654  	assert.True(t, pgconn.Timeout(err))
   655  	assert.Nil(t, notification)
   656  
   657  	// listener can listen again after a timeout
   658  	mustExec(t, notifier, "notify chat")
   659  	notification, err = listener.WaitForNotification(context.Background())
   660  	require.NoError(t, err)
   661  	assert.Equal(t, "chat", notification.Channel)
   662  }
   663  
   664  func TestListenNotifyWhileBusyIsSafe(t *testing.T) {
   665  	t.Parallel()
   666  
   667  	func() {
   668  		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   669  		defer closeConn(t, conn)
   670  		pgxtest.SkipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
   671  	}()
   672  
   673  	listenerDone := make(chan bool)
   674  	notifierDone := make(chan bool)
   675  	listening := make(chan bool)
   676  	go func() {
   677  		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   678  		defer closeConn(t, conn)
   679  		defer func() {
   680  			listenerDone <- true
   681  		}()
   682  
   683  		mustExec(t, conn, "listen busysafe")
   684  		listening <- true
   685  
   686  		for i := 0; i < 5000; i++ {
   687  			var sum int32
   688  			var rowCount int32
   689  
   690  			rows, err := conn.Query(context.Background(), "select generate_series(1,$1)", 100)
   691  			if err != nil {
   692  				t.Errorf("conn.Query failed: %v", err)
   693  				return
   694  			}
   695  
   696  			for rows.Next() {
   697  				var n int32
   698  				if err := rows.Scan(&n); err != nil {
   699  					t.Errorf("Row scan failed: %v", err)
   700  					return
   701  				}
   702  				sum += n
   703  				rowCount++
   704  			}
   705  
   706  			if rows.Err() != nil {
   707  				t.Errorf("conn.Query failed: %v", rows.Err())
   708  				return
   709  			}
   710  
   711  			if sum != 5050 {
   712  				t.Errorf("Wrong rows sum: %v", sum)
   713  				return
   714  			}
   715  
   716  			if rowCount != 100 {
   717  				t.Errorf("Wrong number of rows: %v", rowCount)
   718  				return
   719  			}
   720  		}
   721  	}()
   722  
   723  	go func() {
   724  		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   725  		defer closeConn(t, conn)
   726  		defer func() {
   727  			notifierDone <- true
   728  		}()
   729  
   730  		<-listening
   731  
   732  		for i := 0; i < 100000; i++ {
   733  			mustExec(t, conn, "notify busysafe, 'hello'")
   734  		}
   735  	}()
   736  
   737  	<-listenerDone
   738  	<-notifierDone
   739  }
   740  
   741  func TestListenNotifySelfNotification(t *testing.T) {
   742  	t.Parallel()
   743  
   744  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   745  	defer closeConn(t, conn)
   746  
   747  	pgxtest.SkipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
   748  
   749  	mustExec(t, conn, "listen self")
   750  
   751  	// Notify self and WaitForNotification immediately
   752  	mustExec(t, conn, "notify self")
   753  
   754  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
   755  	defer cancel()
   756  	notification, err := conn.WaitForNotification(ctx)
   757  	require.NoError(t, err)
   758  	assert.Equal(t, "self", notification.Channel)
   759  
   760  	// Notify self and do something else before WaitForNotification
   761  	mustExec(t, conn, "notify self")
   762  
   763  	rows, _ := conn.Query(context.Background(), "select 1")
   764  	rows.Close()
   765  	if rows.Err() != nil {
   766  		t.Fatalf("Unexpected error on Query: %v", rows.Err())
   767  	}
   768  
   769  	ctx, cncl := context.WithTimeout(context.Background(), time.Second)
   770  	defer cncl()
   771  	notification, err = conn.WaitForNotification(ctx)
   772  	require.NoError(t, err)
   773  	assert.Equal(t, "self", notification.Channel)
   774  }
   775  
   776  func TestFatalRxError(t *testing.T) {
   777  	t.Parallel()
   778  
   779  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   780  	defer closeConn(t, conn)
   781  
   782  	pgxtest.SkipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
   783  
   784  	var wg sync.WaitGroup
   785  	wg.Add(1)
   786  	go func() {
   787  		defer wg.Done()
   788  		var n int32
   789  		var s string
   790  		err := conn.QueryRow(context.Background(), "select 1::int4, pg_sleep(10)::varchar").Scan(&n, &s)
   791  		if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Severity == "FATAL" {
   792  		} else {
   793  			t.Errorf("Expected QueryRow Scan to return fatal PgError, but instead received %v", err)
   794  			return
   795  		}
   796  	}()
   797  
   798  	otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   799  	defer otherConn.Close(context.Background())
   800  
   801  	if _, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID()); err != nil {
   802  		t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
   803  	}
   804  
   805  	wg.Wait()
   806  
   807  	if !conn.IsClosed() {
   808  		t.Fatal("Connection should be closed")
   809  	}
   810  }
   811  
   812  func TestFatalTxError(t *testing.T) {
   813  	t.Parallel()
   814  
   815  	// Run timing sensitive test many times
   816  	for i := 0; i < 50; i++ {
   817  		func() {
   818  			conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   819  			defer closeConn(t, conn)
   820  
   821  			pgxtest.SkipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
   822  
   823  			otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   824  			defer otherConn.Close(context.Background())
   825  
   826  			_, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID())
   827  			if err != nil {
   828  				t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
   829  			}
   830  
   831  			err = conn.QueryRow(context.Background(), "select 1").Scan(nil)
   832  			if err == nil {
   833  				t.Fatal("Expected error but none occurred")
   834  			}
   835  
   836  			if !conn.IsClosed() {
   837  				t.Fatalf("Connection should be closed but isn't. Previous Query err: %v", err)
   838  			}
   839  		}()
   840  	}
   841  }
   842  
   843  func TestInsertBoolArray(t *testing.T) {
   844  	t.Parallel()
   845  
   846  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   847  	defer cancel()
   848  
   849  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   850  		if results := mustExec(t, conn, "create temporary table foo(spice bool[]);"); results.String() != "CREATE TABLE" {
   851  			t.Error("Unexpected results from Exec")
   852  		}
   853  
   854  		// Accept parameters
   855  		if results := mustExec(t, conn, "insert into foo(spice) values($1)", []bool{true, false, true}); results.String() != "INSERT 0 1" {
   856  			t.Errorf("Unexpected results from Exec: %v", results)
   857  		}
   858  	})
   859  }
   860  
   861  func TestInsertTimestampArray(t *testing.T) {
   862  	t.Parallel()
   863  
   864  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   865  	defer cancel()
   866  
   867  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   868  		if results := mustExec(t, conn, "create temporary table foo(spice timestamp[]);"); results.String() != "CREATE TABLE" {
   869  			t.Error("Unexpected results from Exec")
   870  		}
   871  
   872  		// Accept parameters
   873  		if results := mustExec(t, conn, "insert into foo(spice) values($1)", []time.Time{time.Unix(1419143667, 0), time.Unix(1419143672, 0)}); results.String() != "INSERT 0 1" {
   874  			t.Errorf("Unexpected results from Exec: %v", results)
   875  		}
   876  	})
   877  }
   878  
   879  func TestIdentifierSanitize(t *testing.T) {
   880  	t.Parallel()
   881  
   882  	tests := []struct {
   883  		ident    pgx.Identifier
   884  		expected string
   885  	}{
   886  		{
   887  			ident:    pgx.Identifier{`foo`},
   888  			expected: `"foo"`,
   889  		},
   890  		{
   891  			ident:    pgx.Identifier{`select`},
   892  			expected: `"select"`,
   893  		},
   894  		{
   895  			ident:    pgx.Identifier{`foo`, `bar`},
   896  			expected: `"foo"."bar"`,
   897  		},
   898  		{
   899  			ident:    pgx.Identifier{`you should " not do this`},
   900  			expected: `"you should "" not do this"`,
   901  		},
   902  		{
   903  			ident:    pgx.Identifier{`you should " not do this`, `please don't`},
   904  			expected: `"you should "" not do this"."please don't"`,
   905  		},
   906  		{
   907  			ident:    pgx.Identifier{`you should ` + string([]byte{0}) + `not do this`},
   908  			expected: `"you should not do this"`,
   909  		},
   910  	}
   911  
   912  	for i, tt := range tests {
   913  		qval := tt.ident.Sanitize()
   914  		if qval != tt.expected {
   915  			t.Errorf("%d. Expected Sanitize %v to return %v but it was %v", i, tt.ident, tt.expected, qval)
   916  		}
   917  	}
   918  }
   919  
   920  func TestConnInitTypeMap(t *testing.T) {
   921  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   922  	defer closeConn(t, conn)
   923  
   924  	// spot check that the standard postgres type names aren't qualified
   925  	nameOIDs := map[string]uint32{
   926  		"_int8": pgtype.Int8ArrayOID,
   927  		"int8":  pgtype.Int8OID,
   928  		"json":  pgtype.JSONOID,
   929  		"text":  pgtype.TextOID,
   930  	}
   931  	for name, oid := range nameOIDs {
   932  		dtByName, ok := conn.TypeMap().TypeForName(name)
   933  		if !ok {
   934  			t.Fatalf("Expected type named %v to be present", name)
   935  		}
   936  		dtByOID, ok := conn.TypeMap().TypeForOID(oid)
   937  		if !ok {
   938  			t.Fatalf("Expected type OID %v to be present", oid)
   939  		}
   940  		if dtByName != dtByOID {
   941  			t.Fatalf("Expected type named %v to be the same as type OID %v", name, oid)
   942  		}
   943  	}
   944  
   945  	ensureConnValid(t, conn)
   946  }
   947  
   948  func TestUnregisteredTypeUsableAsStringArgumentAndBaseResult(t *testing.T) {
   949  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   950  	defer cancel()
   951  
   952  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   953  		pgxtest.SkipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
   954  
   955  		var n uint64
   956  		err := conn.QueryRow(context.Background(), "select $1::uint64", "42").Scan(&n)
   957  		if err != nil {
   958  			t.Fatal(err)
   959  		}
   960  
   961  		if n != 42 {
   962  			t.Fatalf("Expected n to be 42, but was %v", n)
   963  		}
   964  	})
   965  }
   966  
   967  func TestDomainType(t *testing.T) {
   968  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
   969  	defer cancel()
   970  
   971  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
   972  		pgxtest.SkipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
   973  
   974  		// Domain type uint64 is a PostgreSQL domain of underlying type numeric.
   975  
   976  		// In the extended protocol preparing "select $1::uint64" appears to create a statement that expects a param OID of
   977  		// uint64 but a result OID of the underlying numeric.
   978  
   979  		var s string
   980  		err := conn.QueryRow(ctx, "select $1::uint64", "24").Scan(&s)
   981  		require.NoError(t, err)
   982  		require.Equal(t, "24", s)
   983  
   984  		// Register type
   985  		uint64Type, err := conn.LoadType(ctx, "uint64")
   986  		require.NoError(t, err)
   987  		conn.TypeMap().RegisterType(uint64Type)
   988  
   989  		var n uint64
   990  		err = conn.QueryRow(ctx, "select $1::uint64", uint64(24)).Scan(&n)
   991  		require.NoError(t, err)
   992  
   993  		// String is still an acceptable argument after registration
   994  		err = conn.QueryRow(ctx, "select $1::uint64", "7").Scan(&n)
   995  		if err != nil {
   996  			t.Fatal(err)
   997  		}
   998  		if n != 7 {
   999  			t.Fatalf("Expected n to be 7, but was %v", n)
  1000  		}
  1001  	})
  1002  }
  1003  
  1004  func TestLoadTypeSameNameInDifferentSchemas(t *testing.T) {
  1005  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1006  	defer cancel()
  1007  
  1008  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
  1009  		pgxtest.SkipCockroachDB(t, conn, "Server does support composite types (https://github.com/cockroachdb/cockroach/issues/27792)")
  1010  
  1011  		tx, err := conn.Begin(ctx)
  1012  		require.NoError(t, err)
  1013  		defer tx.Rollback(ctx)
  1014  
  1015  		_, err = tx.Exec(ctx, `create schema pgx_a;
  1016  create type pgx_a.point as (a text, b text);
  1017  create schema pgx_b;
  1018  create type pgx_b.point as (c text);
  1019  `)
  1020  		require.NoError(t, err)
  1021  
  1022  		// Register types
  1023  		for _, typename := range []string{"pgx_a.point", "pgx_b.point"} {
  1024  			// Obviously using conn while a tx is in use and registering a type after the connection has been established are
  1025  			// really bad practices, but for the sake of convenience we do it in the test here.
  1026  			dt, err := conn.LoadType(ctx, typename)
  1027  			require.NoError(t, err)
  1028  			conn.TypeMap().RegisterType(dt)
  1029  		}
  1030  
  1031  		type aPoint struct {
  1032  			A string
  1033  			B string
  1034  		}
  1035  
  1036  		type bPoint struct {
  1037  			C string
  1038  		}
  1039  
  1040  		var a aPoint
  1041  		var b bPoint
  1042  		err = tx.QueryRow(ctx, `select '(foo,bar)'::pgx_a.point, '(baz)'::pgx_b.point`).Scan(&a, &b)
  1043  		require.NoError(t, err)
  1044  		require.Equal(t, aPoint{"foo", "bar"}, a)
  1045  		require.Equal(t, bPoint{"baz"}, b)
  1046  	})
  1047  }
  1048  
  1049  func TestLoadCompositeType(t *testing.T) {
  1050  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1051  	defer cancel()
  1052  
  1053  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
  1054  		pgxtest.SkipCockroachDB(t, conn, "Server does support composite types (https://github.com/cockroachdb/cockroach/issues/27792)")
  1055  
  1056  		tx, err := conn.Begin(ctx)
  1057  		require.NoError(t, err)
  1058  		defer tx.Rollback(ctx)
  1059  
  1060  		_, err = tx.Exec(ctx, "create type compositetype as (attr1 int, attr2 int)")
  1061  		require.NoError(t, err)
  1062  
  1063  		_, err = tx.Exec(ctx, "alter type compositetype drop attribute attr1")
  1064  		require.NoError(t, err)
  1065  
  1066  		_, err = conn.LoadType(ctx, "compositetype")
  1067  		require.NoError(t, err)
  1068  	})
  1069  }
  1070  
  1071  func TestLoadRangeType(t *testing.T) {
  1072  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1073  	defer cancel()
  1074  
  1075  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
  1076  		pgxtest.SkipCockroachDB(t, conn, "Server does support range types")
  1077  
  1078  		tx, err := conn.Begin(ctx)
  1079  		require.NoError(t, err)
  1080  		defer tx.Rollback(ctx)
  1081  
  1082  		_, err = tx.Exec(ctx, "create type examplefloatrange as range (subtype=float8, subtype_diff=float8mi)")
  1083  		require.NoError(t, err)
  1084  
  1085  		// Register types
  1086  		newRangeType, err := conn.LoadType(ctx, "examplefloatrange")
  1087  		require.NoError(t, err)
  1088  		conn.TypeMap().RegisterType(newRangeType)
  1089  		conn.TypeMap().RegisterDefaultPgType(pgtype.Range[float64]{}, "examplefloatrange")
  1090  
  1091  		var inputRangeType = pgtype.Range[float64]{
  1092  			Lower:     1.0,
  1093  			Upper:     2.0,
  1094  			LowerType: pgtype.Inclusive,
  1095  			UpperType: pgtype.Inclusive,
  1096  			Valid:     true,
  1097  		}
  1098  		var outputRangeType pgtype.Range[float64]
  1099  		err = tx.QueryRow(ctx, "SELECT $1::examplefloatrange", inputRangeType).Scan(&outputRangeType)
  1100  		require.NoError(t, err)
  1101  		require.Equal(t, inputRangeType, outputRangeType)
  1102  	})
  1103  }
  1104  
  1105  func TestLoadMultiRangeType(t *testing.T) {
  1106  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1107  	defer cancel()
  1108  
  1109  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
  1110  		pgxtest.SkipCockroachDB(t, conn, "Server does support range types")
  1111  		pgxtest.SkipPostgreSQLVersionLessThan(t, conn, 14) // multirange data type was added in 14 postgresql
  1112  
  1113  		tx, err := conn.Begin(ctx)
  1114  		require.NoError(t, err)
  1115  		defer tx.Rollback(ctx)
  1116  
  1117  		_, err = tx.Exec(ctx, "create type examplefloatrange as range (subtype=float8, subtype_diff=float8mi, multirange_type_name=examplefloatmultirange)")
  1118  		require.NoError(t, err)
  1119  
  1120  		// Register types
  1121  		newRangeType, err := conn.LoadType(ctx, "examplefloatrange")
  1122  		require.NoError(t, err)
  1123  		conn.TypeMap().RegisterType(newRangeType)
  1124  		conn.TypeMap().RegisterDefaultPgType(pgtype.Range[float64]{}, "examplefloatrange")
  1125  
  1126  		newMultiRangeType, err := conn.LoadType(ctx, "examplefloatmultirange")
  1127  		require.NoError(t, err)
  1128  		conn.TypeMap().RegisterType(newMultiRangeType)
  1129  		conn.TypeMap().RegisterDefaultPgType(pgtype.Multirange[pgtype.Range[float64]]{}, "examplefloatmultirange")
  1130  
  1131  		var inputMultiRangeType = pgtype.Multirange[pgtype.Range[float64]]{
  1132  			{
  1133  				Lower:     1.0,
  1134  				Upper:     2.0,
  1135  				LowerType: pgtype.Inclusive,
  1136  				UpperType: pgtype.Inclusive,
  1137  				Valid:     true,
  1138  			},
  1139  			{
  1140  				Lower:     3.0,
  1141  				Upper:     4.0,
  1142  				LowerType: pgtype.Exclusive,
  1143  				UpperType: pgtype.Exclusive,
  1144  				Valid:     true,
  1145  			},
  1146  		}
  1147  		var outputMultiRangeType pgtype.Multirange[pgtype.Range[float64]]
  1148  		err = tx.QueryRow(ctx, "SELECT $1::examplefloatmultirange", inputMultiRangeType).Scan(&outputMultiRangeType)
  1149  		require.NoError(t, err)
  1150  		require.Equal(t, inputMultiRangeType, outputMultiRangeType)
  1151  	})
  1152  }
  1153  
  1154  func TestStmtCacheInvalidationConn(t *testing.T) {
  1155  	ctx := context.Background()
  1156  
  1157  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
  1158  	defer closeConn(t, conn)
  1159  
  1160  	// create a table and fill it with some data
  1161  	_, err := conn.Exec(ctx, `
  1162          DROP TABLE IF EXISTS drop_cols;
  1163          CREATE TABLE drop_cols (
  1164              id SERIAL PRIMARY KEY NOT NULL,
  1165              f1 int NOT NULL,
  1166              f2 int NOT NULL
  1167          );
  1168      `)
  1169  	require.NoError(t, err)
  1170  	_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
  1171  	require.NoError(t, err)
  1172  
  1173  	getSQL := "SELECT * FROM drop_cols WHERE id = $1"
  1174  
  1175  	// This query will populate the statement cache. We don't care about the result.
  1176  	rows, err := conn.Query(ctx, getSQL, 1)
  1177  	require.NoError(t, err)
  1178  	rows.Close()
  1179  	require.NoError(t, rows.Err())
  1180  
  1181  	// Now, change the schema of the table out from under the statement, making it invalid.
  1182  	_, err = conn.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
  1183  	require.NoError(t, err)
  1184  
  1185  	// We must get an error the first time we try to re-execute a bad statement.
  1186  	// It is up to the application to determine if it wants to try again. We punt to
  1187  	// the application because there is no clear recovery path in the case of failed transactions
  1188  	// or batch operations and because automatic retry is tricky and we don't want to get
  1189  	// it wrong at such an importaint layer of the stack.
  1190  	rows, err = conn.Query(ctx, getSQL, 1)
  1191  	require.NoError(t, err)
  1192  	rows.Next()
  1193  	nextErr := rows.Err()
  1194  	rows.Close()
  1195  	for _, err := range []error{nextErr, rows.Err()} {
  1196  		if err == nil {
  1197  			t.Fatal(`expected "cached plan must not change result type": no error`)
  1198  		}
  1199  		if !strings.Contains(err.Error(), "cached plan must not change result type") {
  1200  			t.Fatalf(`expected "cached plan must not change result type", got: "%s"`, err.Error())
  1201  		}
  1202  	}
  1203  
  1204  	// On retry, the statement should have been flushed from the cache.
  1205  	rows, err = conn.Query(ctx, getSQL, 1)
  1206  	require.NoError(t, err)
  1207  	rows.Next()
  1208  	err = rows.Err()
  1209  	require.NoError(t, err)
  1210  	rows.Close()
  1211  	require.NoError(t, rows.Err())
  1212  
  1213  	ensureConnValid(t, conn)
  1214  }
  1215  
  1216  func TestStmtCacheInvalidationTx(t *testing.T) {
  1217  	ctx := context.Background()
  1218  
  1219  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
  1220  	defer closeConn(t, conn)
  1221  
  1222  	if conn.PgConn().ParameterStatus("crdb_version") != "" {
  1223  		t.Skip("Server has non-standard prepare in errored transaction behavior (https://github.com/cockroachdb/cockroach/issues/84140)")
  1224  	}
  1225  
  1226  	// create a table and fill it with some data
  1227  	_, err := conn.Exec(ctx, `
  1228          DROP TABLE IF EXISTS drop_cols;
  1229          CREATE TABLE drop_cols (
  1230              id SERIAL PRIMARY KEY NOT NULL,
  1231              f1 int NOT NULL,
  1232              f2 int NOT NULL
  1233          );
  1234      `)
  1235  	require.NoError(t, err)
  1236  	_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
  1237  	require.NoError(t, err)
  1238  
  1239  	tx, err := conn.Begin(ctx)
  1240  	require.NoError(t, err)
  1241  
  1242  	getSQL := "SELECT * FROM drop_cols WHERE id = $1"
  1243  
  1244  	// This query will populate the statement cache. We don't care about the result.
  1245  	rows, err := tx.Query(ctx, getSQL, 1)
  1246  	require.NoError(t, err)
  1247  	rows.Close()
  1248  	require.NoError(t, rows.Err())
  1249  
  1250  	// Now, change the schema of the table out from under the statement, making it invalid.
  1251  	_, err = tx.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
  1252  	require.NoError(t, err)
  1253  
  1254  	// We must get an error the first time we try to re-execute a bad statement.
  1255  	// It is up to the application to determine if it wants to try again. We punt to
  1256  	// the application because there is no clear recovery path in the case of failed transactions
  1257  	// or batch operations and because automatic retry is tricky and we don't want to get
  1258  	// it wrong at such an importaint layer of the stack.
  1259  	rows, err = tx.Query(ctx, getSQL, 1)
  1260  	require.NoError(t, err)
  1261  	rows.Next()
  1262  	nextErr := rows.Err()
  1263  	rows.Close()
  1264  	for _, err := range []error{nextErr, rows.Err()} {
  1265  		if err == nil {
  1266  			t.Fatal(`expected "cached plan must not change result type": no error`)
  1267  		}
  1268  		if !strings.Contains(err.Error(), "cached plan must not change result type") {
  1269  			t.Fatalf(`expected "cached plan must not change result type", got: "%s"`, err.Error())
  1270  		}
  1271  	}
  1272  
  1273  	rows, _ = tx.Query(ctx, getSQL, 1)
  1274  	rows.Close()
  1275  	err = rows.Err()
  1276  	// Retries within the same transaction are errors (really anything except a rollback
  1277  	// will be an error in this transaction).
  1278  	require.Error(t, err)
  1279  	rows.Close()
  1280  
  1281  	err = tx.Rollback(ctx)
  1282  	require.NoError(t, err)
  1283  
  1284  	// once we've rolled back, retries will work
  1285  	rows, err = conn.Query(ctx, getSQL, 1)
  1286  	require.NoError(t, err)
  1287  	rows.Next()
  1288  	err = rows.Err()
  1289  	require.NoError(t, err)
  1290  	rows.Close()
  1291  
  1292  	ensureConnValid(t, conn)
  1293  }
  1294  
  1295  func TestInsertDurationInterval(t *testing.T) {
  1296  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1297  	defer cancel()
  1298  
  1299  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
  1300  		_, err := conn.Exec(context.Background(), "create temporary table t(duration INTERVAL(0) NOT NULL)")
  1301  		require.NoError(t, err)
  1302  
  1303  		result, err := conn.Exec(context.Background(), "insert into t(duration) values($1)", time.Minute)
  1304  		require.NoError(t, err)
  1305  
  1306  		n := result.RowsAffected()
  1307  		require.EqualValues(t, 1, n)
  1308  	})
  1309  }
  1310  
  1311  func TestRawValuesUnderlyingMemoryReused(t *testing.T) {
  1312  	defaultConnTestRunner.RunTest(context.Background(), t, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
  1313  		var buf []byte
  1314  
  1315  		rows, err := conn.Query(ctx, `select 1::int`)
  1316  		require.NoError(t, err)
  1317  
  1318  		for rows.Next() {
  1319  			buf = rows.RawValues()[0]
  1320  		}
  1321  
  1322  		require.NoError(t, rows.Err())
  1323  
  1324  		original := make([]byte, len(buf))
  1325  		copy(original, buf)
  1326  
  1327  		for i := 0; i < 1_000_000; i++ {
  1328  			rows, err := conn.Query(ctx, `select $1::int`, i)
  1329  			require.NoError(t, err)
  1330  			rows.Close()
  1331  			require.NoError(t, rows.Err())
  1332  
  1333  			if !bytes.Equal(original, buf) {
  1334  				return
  1335  			}
  1336  		}
  1337  
  1338  		t.Fatal("expected buffer from RawValues to be overwritten by subsequent queries but it was not")
  1339  	})
  1340  }
  1341  
  1342  // https://github.com/jackc/pgx/issues/1847
  1343  func TestConnDeallocateInvalidatedCachedStatementsWhenCanceled(t *testing.T) {
  1344  	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
  1345  	defer cancel()
  1346  
  1347  	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
  1348  		pgxtest.SkipCockroachDB(t, conn, "CockroachDB returns decimal instead of integer for integer division")
  1349  
  1350  		var n int32
  1351  		err := conn.QueryRow(ctx, "select 1 / $1::int", 1).Scan(&n)
  1352  		require.NoError(t, err)
  1353  		require.EqualValues(t, 1, n)
  1354  
  1355  		// Divide by zero causes an error. baseRows.Close() calls Invalidate on the statement cache whenever an error was
  1356  		// encountered by the query. Use this to purposely invalidate the query. If we had access to private fields of conn
  1357  		// we could call conn.statementCache.InvalidateAll() instead.
  1358  		err = conn.QueryRow(ctx, "select 1 / $1::int", 0).Scan(&n)
  1359  		require.Error(t, err)
  1360  
  1361  		ctx2, cancel2 := context.WithCancel(ctx)
  1362  		cancel2()
  1363  		err = conn.QueryRow(ctx2, "select 1 / $1::int", 1).Scan(&n)
  1364  		require.Error(t, err)
  1365  		require.ErrorIs(t, err, context.Canceled)
  1366  
  1367  		err = conn.QueryRow(ctx, "select 1 / $1::int", 1).Scan(&n)
  1368  		require.NoError(t, err)
  1369  		require.EqualValues(t, 1, n)
  1370  	})
  1371  }
  1372  
  1373  // https://github.com/jackc/pgx/issues/1847
  1374  func TestConnDeallocateInvalidatedCachedStatementsInTransactionWithBatch(t *testing.T) {
  1375  	t.Parallel()
  1376  
  1377  	ctx := context.Background()
  1378  
  1379  	connString := os.Getenv("PGX_TEST_DATABASE")
  1380  	config := mustParseConfig(t, connString)
  1381  	config.DefaultQueryExecMode = pgx.QueryExecModeCacheStatement
  1382  	config.StatementCacheCapacity = 2
  1383  
  1384  	conn, err := pgx.ConnectConfig(ctx, config)
  1385  	require.NoError(t, err)
  1386  
  1387  	tx, err := conn.Begin(ctx)
  1388  	require.NoError(t, err)
  1389  	defer tx.Rollback(ctx)
  1390  
  1391  	_, err = tx.Exec(ctx, "select $1::int + 1", 1)
  1392  	require.NoError(t, err)
  1393  
  1394  	_, err = tx.Exec(ctx, "select $1::int + 2", 1)
  1395  	require.NoError(t, err)
  1396  
  1397  	// This should invalidate the first cached statement.
  1398  	_, err = tx.Exec(ctx, "select $1::int + 3", 1)
  1399  	require.NoError(t, err)
  1400  
  1401  	batch := &pgx.Batch{}
  1402  	batch.Queue("select $1::int + 1", 1)
  1403  	err = tx.SendBatch(ctx, batch).Close()
  1404  	require.NoError(t, err)
  1405  
  1406  	err = tx.Rollback(ctx)
  1407  	require.NoError(t, err)
  1408  
  1409  	ensureConnValid(t, conn)
  1410  }
  1411  

View as plain text