...

Source file src/github.com/jackc/pgx/v4/conn_test.go

Documentation: github.com/jackc/pgx/v4

     1  package pgx_test
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"log"
     7  	"os"
     8  	"strings"
     9  	"sync"
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/jackc/pgconn"
    14  	"github.com/jackc/pgconn/stmtcache"
    15  	"github.com/jackc/pgtype"
    16  	"github.com/jackc/pgx/v4"
    17  	"github.com/stretchr/testify/assert"
    18  	"github.com/stretchr/testify/require"
    19  )
    20  
    21  func TestCrateDBConnect(t *testing.T) {
    22  	t.Parallel()
    23  
    24  	connString := os.Getenv("PGX_TEST_CRATEDB_CONN_STRING")
    25  	if connString == "" {
    26  		t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_CRATEDB_CONN_STRING")
    27  	}
    28  
    29  	conn, err := pgx.Connect(context.Background(), connString)
    30  	require.Nil(t, err)
    31  	defer closeConn(t, conn)
    32  
    33  	assert.Equal(t, connString, conn.Config().ConnString())
    34  
    35  	var result int
    36  	err = conn.QueryRow(context.Background(), "select 1 +1").Scan(&result)
    37  	if err != nil {
    38  		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
    39  	}
    40  	if result != 2 {
    41  		t.Errorf("bad result: %d", result)
    42  	}
    43  }
    44  
    45  func TestConnect(t *testing.T) {
    46  	t.Parallel()
    47  
    48  	connString := os.Getenv("PGX_TEST_DATABASE")
    49  	config := mustParseConfig(t, connString)
    50  
    51  	conn, err := pgx.ConnectConfig(context.Background(), config)
    52  	if err != nil {
    53  		t.Fatalf("Unable to establish connection: %v", err)
    54  	}
    55  
    56  	assertConfigsEqual(t, config, conn.Config(), "Conn.Config() returns original config")
    57  
    58  	var currentDB string
    59  	err = conn.QueryRow(context.Background(), "select current_database()").Scan(&currentDB)
    60  	if err != nil {
    61  		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
    62  	}
    63  	if currentDB != config.Config.Database {
    64  		t.Errorf("Did not connect to specified database (%v)", config.Config.Database)
    65  	}
    66  
    67  	var user string
    68  	err = conn.QueryRow(context.Background(), "select current_user").Scan(&user)
    69  	if err != nil {
    70  		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
    71  	}
    72  	if user != config.Config.User {
    73  		t.Errorf("Did not connect as specified user (%v)", config.Config.User)
    74  	}
    75  
    76  	err = conn.Close(context.Background())
    77  	if err != nil {
    78  		t.Fatal("Unable to close connection")
    79  	}
    80  }
    81  
    82  func TestConnectWithPreferSimpleProtocol(t *testing.T) {
    83  	t.Parallel()
    84  
    85  	connConfig := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
    86  	connConfig.PreferSimpleProtocol = true
    87  
    88  	conn := mustConnect(t, connConfig)
    89  	defer closeConn(t, conn)
    90  
    91  	// If simple protocol is used we should be able to correctly scan the result
    92  	// into a pgtype.Text as the integer will have been encoded in text.
    93  
    94  	var s pgtype.Text
    95  	err := conn.QueryRow(context.Background(), "select $1::int4", 42).Scan(&s)
    96  	if err != nil {
    97  		t.Fatal(err)
    98  	}
    99  
   100  	if s.Get() != "42" {
   101  		t.Fatalf(`expected "42", got %v`, s)
   102  	}
   103  
   104  	ensureConnValid(t, conn)
   105  }
   106  
   107  func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
   108  	config := &pgx.ConnConfig{}
   109  	require.PanicsWithValue(t, "config must be created by ParseConfig", func() {
   110  		pgx.ConnectConfig(context.Background(), config)
   111  	})
   112  }
   113  
   114  func TestConfigContainsConnStr(t *testing.T) {
   115  	connStr := os.Getenv("PGX_TEST_DATABASE")
   116  	config, err := pgx.ParseConfig(connStr)
   117  	require.NoError(t, err)
   118  	assert.Equal(t, connStr, config.ConnString())
   119  }
   120  
   121  func TestConfigCopyReturnsEqualConfig(t *testing.T) {
   122  	connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
   123  	original, err := pgx.ParseConfig(connString)
   124  	require.NoError(t, err)
   125  
   126  	copied := original.Copy()
   127  	assertConfigsEqual(t, original, copied, t.Name())
   128  }
   129  
   130  func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
   131  	connString := os.Getenv("PGX_TEST_DATABASE")
   132  	original, err := pgx.ParseConfig(connString)
   133  	require.NoError(t, err)
   134  
   135  	copied := original.Copy()
   136  	assert.NotPanics(t, func() {
   137  		_, err = pgx.ConnectConfig(context.Background(), copied)
   138  	})
   139  	assert.NoError(t, err)
   140  }
   141  
   142  func TestParseConfigExtractsStatementCacheOptions(t *testing.T) {
   143  	t.Parallel()
   144  
   145  	config, err := pgx.ParseConfig("statement_cache_capacity=0")
   146  	require.NoError(t, err)
   147  	require.Nil(t, config.BuildStatementCache)
   148  
   149  	config, err = pgx.ParseConfig("statement_cache_capacity=42")
   150  	require.NoError(t, err)
   151  	require.NotNil(t, config.BuildStatementCache)
   152  	c := config.BuildStatementCache(nil)
   153  	require.NotNil(t, c)
   154  	require.Equal(t, 42, c.Cap())
   155  	require.Equal(t, stmtcache.ModePrepare, c.Mode())
   156  
   157  	config, err = pgx.ParseConfig("statement_cache_capacity=42 statement_cache_mode=prepare")
   158  	require.NoError(t, err)
   159  	require.NotNil(t, config.BuildStatementCache)
   160  	c = config.BuildStatementCache(nil)
   161  	require.NotNil(t, c)
   162  	require.Equal(t, 42, c.Cap())
   163  	require.Equal(t, stmtcache.ModePrepare, c.Mode())
   164  
   165  	config, err = pgx.ParseConfig("statement_cache_capacity=42 statement_cache_mode=describe")
   166  	require.NoError(t, err)
   167  	require.NotNil(t, config.BuildStatementCache)
   168  	c = config.BuildStatementCache(nil)
   169  	require.NotNil(t, c)
   170  	require.Equal(t, 42, c.Cap())
   171  	require.Equal(t, stmtcache.ModeDescribe, c.Mode())
   172  }
   173  
   174  func TestParseConfigExtractsPreferSimpleProtocol(t *testing.T) {
   175  	t.Parallel()
   176  
   177  	for _, tt := range []struct {
   178  		connString           string
   179  		preferSimpleProtocol bool
   180  	}{
   181  		{"", false},
   182  		{"prefer_simple_protocol=false", false},
   183  		{"prefer_simple_protocol=0", false},
   184  		{"prefer_simple_protocol=true", true},
   185  		{"prefer_simple_protocol=1", true},
   186  	} {
   187  		config, err := pgx.ParseConfig(tt.connString)
   188  		require.NoError(t, err)
   189  		require.Equalf(t, tt.preferSimpleProtocol, config.PreferSimpleProtocol, "connString: `%s`", tt.connString)
   190  		require.Empty(t, config.RuntimeParams["prefer_simple_protocol"])
   191  	}
   192  }
   193  
   194  func TestExec(t *testing.T) {
   195  	t.Parallel()
   196  
   197  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   198  		if results := mustExec(t, conn, "create temporary table foo(id integer primary key);"); string(results) != "CREATE TABLE" {
   199  			t.Error("Unexpected results from Exec")
   200  		}
   201  
   202  		// Accept parameters
   203  		if results := mustExec(t, conn, "insert into foo(id) values($1)", 1); string(results) != "INSERT 0 1" {
   204  			t.Errorf("Unexpected results from Exec: %v", results)
   205  		}
   206  
   207  		if results := mustExec(t, conn, "drop table foo;"); string(results) != "DROP TABLE" {
   208  			t.Error("Unexpected results from Exec")
   209  		}
   210  
   211  		// Multiple statements can be executed -- last command tag is returned
   212  		if results := mustExec(t, conn, "create temporary table foo(id serial primary key); drop table foo;"); string(results) != "DROP TABLE" {
   213  			t.Error("Unexpected results from Exec")
   214  		}
   215  
   216  		// Can execute longer SQL strings than sharedBufferSize
   217  		if results := mustExec(t, conn, strings.Repeat("select 42; ", 1000)); string(results) != "SELECT 1" {
   218  			t.Errorf("Unexpected results from Exec: %v", results)
   219  		}
   220  
   221  		// Exec no-op which does not return a command tag
   222  		if results := mustExec(t, conn, "--;"); string(results) != "" {
   223  			t.Errorf("Unexpected results from Exec: %v", results)
   224  		}
   225  	})
   226  }
   227  
   228  func TestExecFailure(t *testing.T) {
   229  	t.Parallel()
   230  
   231  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   232  		if _, err := conn.Exec(context.Background(), "selct;"); err == nil {
   233  			t.Fatal("Expected SQL syntax error")
   234  		}
   235  
   236  		rows, _ := conn.Query(context.Background(), "select 1")
   237  		rows.Close()
   238  		if rows.Err() != nil {
   239  			t.Fatalf("Exec failure appears to have broken connection: %v", rows.Err())
   240  		}
   241  	})
   242  }
   243  
   244  func TestExecFailureWithArguments(t *testing.T) {
   245  	t.Parallel()
   246  
   247  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   248  		_, err := conn.Exec(context.Background(), "selct $1;", 1)
   249  		if err == nil {
   250  			t.Fatal("Expected SQL syntax error")
   251  		}
   252  		assert.False(t, pgconn.SafeToRetry(err))
   253  
   254  		_, err = conn.Exec(context.Background(), "select $1::varchar(1);", "1", "2")
   255  		require.Error(t, err)
   256  	})
   257  }
   258  
   259  func TestExecContextWithoutCancelation(t *testing.T) {
   260  	t.Parallel()
   261  
   262  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   263  		ctx, cancelFunc := context.WithCancel(context.Background())
   264  		defer cancelFunc()
   265  
   266  		commandTag, err := conn.Exec(ctx, "create temporary table foo(id integer primary key);")
   267  		if err != nil {
   268  			t.Fatal(err)
   269  		}
   270  		if string(commandTag) != "CREATE TABLE" {
   271  			t.Fatalf("Unexpected results from Exec: %v", commandTag)
   272  		}
   273  		assert.False(t, pgconn.SafeToRetry(err))
   274  	})
   275  }
   276  
   277  func TestExecContextFailureWithoutCancelation(t *testing.T) {
   278  	t.Parallel()
   279  
   280  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   281  		ctx, cancelFunc := context.WithCancel(context.Background())
   282  		defer cancelFunc()
   283  
   284  		_, err := conn.Exec(ctx, "selct;")
   285  		if err == nil {
   286  			t.Fatal("Expected SQL syntax error")
   287  		}
   288  		assert.False(t, pgconn.SafeToRetry(err))
   289  
   290  		rows, _ := conn.Query(context.Background(), "select 1")
   291  		rows.Close()
   292  		if rows.Err() != nil {
   293  			t.Fatalf("ExecEx failure appears to have broken connection: %v", rows.Err())
   294  		}
   295  		assert.False(t, pgconn.SafeToRetry(err))
   296  	})
   297  }
   298  
   299  func TestExecContextFailureWithoutCancelationWithArguments(t *testing.T) {
   300  	t.Parallel()
   301  
   302  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   303  		ctx, cancelFunc := context.WithCancel(context.Background())
   304  		defer cancelFunc()
   305  
   306  		_, err := conn.Exec(ctx, "selct $1;", 1)
   307  		if err == nil {
   308  			t.Fatal("Expected SQL syntax error")
   309  		}
   310  		assert.False(t, pgconn.SafeToRetry(err))
   311  	})
   312  }
   313  
   314  func TestExecFailureCloseBefore(t *testing.T) {
   315  	t.Parallel()
   316  
   317  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   318  	closeConn(t, conn)
   319  
   320  	_, err := conn.Exec(context.Background(), "select 1")
   321  	require.Error(t, err)
   322  	assert.True(t, pgconn.SafeToRetry(err))
   323  }
   324  
   325  func TestExecStatementCacheModes(t *testing.T) {
   326  	t.Parallel()
   327  
   328  	config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
   329  
   330  	tests := []struct {
   331  		name                string
   332  		buildStatementCache pgx.BuildStatementCacheFunc
   333  	}{
   334  		{
   335  			name:                "disabled",
   336  			buildStatementCache: nil,
   337  		},
   338  		{
   339  			name: "prepare",
   340  			buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
   341  				return stmtcache.New(conn, stmtcache.ModePrepare, 32)
   342  			},
   343  		},
   344  		{
   345  			name: "describe",
   346  			buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
   347  				return stmtcache.New(conn, stmtcache.ModeDescribe, 32)
   348  			},
   349  		},
   350  	}
   351  
   352  	for _, tt := range tests {
   353  		func() {
   354  			config.BuildStatementCache = tt.buildStatementCache
   355  			conn := mustConnect(t, config)
   356  			defer closeConn(t, conn)
   357  
   358  			commandTag, err := conn.Exec(context.Background(), "select 1")
   359  			assert.NoError(t, err, tt.name)
   360  			assert.Equal(t, "SELECT 1", string(commandTag), tt.name)
   361  
   362  			commandTag, err = conn.Exec(context.Background(), "select 1 union all select 1")
   363  			assert.NoError(t, err, tt.name)
   364  			assert.Equal(t, "SELECT 2", string(commandTag), tt.name)
   365  
   366  			commandTag, err = conn.Exec(context.Background(), "select 1")
   367  			assert.NoError(t, err, tt.name)
   368  			assert.Equal(t, "SELECT 1", string(commandTag), tt.name)
   369  
   370  			ensureConnValid(t, conn)
   371  		}()
   372  	}
   373  }
   374  
   375  func TestExecPerQuerySimpleProtocol(t *testing.T) {
   376  	t.Parallel()
   377  
   378  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   379  	defer closeConn(t, conn)
   380  
   381  	ctx, cancelFunc := context.WithCancel(context.Background())
   382  	defer cancelFunc()
   383  
   384  	commandTag, err := conn.Exec(ctx, "create temporary table foo(name varchar primary key);")
   385  	if err != nil {
   386  		t.Fatal(err)
   387  	}
   388  	if string(commandTag) != "CREATE TABLE" {
   389  		t.Fatalf("Unexpected results from Exec: %v", commandTag)
   390  	}
   391  
   392  	commandTag, err = conn.Exec(ctx,
   393  		"insert into foo(name) values($1);",
   394  		pgx.QuerySimpleProtocol(true),
   395  		"bar'; drop table foo;--",
   396  	)
   397  	if err != nil {
   398  		t.Fatal(err)
   399  	}
   400  	if string(commandTag) != "INSERT 0 1" {
   401  		t.Fatalf("Unexpected results from Exec: %v", commandTag)
   402  	}
   403  
   404  }
   405  
   406  func TestPrepare(t *testing.T) {
   407  	t.Parallel()
   408  
   409  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   410  	defer closeConn(t, conn)
   411  
   412  	_, err := conn.Prepare(context.Background(), "test", "select $1::varchar")
   413  	if err != nil {
   414  		t.Errorf("Unable to prepare statement: %v", err)
   415  		return
   416  	}
   417  
   418  	var s string
   419  	err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
   420  	if err != nil {
   421  		t.Errorf("Executing prepared statement failed: %v", err)
   422  	}
   423  
   424  	if s != "hello" {
   425  		t.Errorf("Prepared statement did not return expected value: %v", s)
   426  	}
   427  
   428  	err = conn.Deallocate(context.Background(), "test")
   429  	if err != nil {
   430  		t.Errorf("conn.Deallocate failed: %v", err)
   431  	}
   432  
   433  	// Create another prepared statement to ensure Deallocate left the connection
   434  	// in a working state and that we can reuse the prepared statement name.
   435  
   436  	_, err = conn.Prepare(context.Background(), "test", "select $1::integer")
   437  	if err != nil {
   438  		t.Errorf("Unable to prepare statement: %v", err)
   439  		return
   440  	}
   441  
   442  	var n int32
   443  	err = conn.QueryRow(context.Background(), "test", int32(1)).Scan(&n)
   444  	if err != nil {
   445  		t.Errorf("Executing prepared statement failed: %v", err)
   446  	}
   447  
   448  	if n != 1 {
   449  		t.Errorf("Prepared statement did not return expected value: %v", s)
   450  	}
   451  
   452  	err = conn.Deallocate(context.Background(), "test")
   453  	if err != nil {
   454  		t.Errorf("conn.Deallocate failed: %v", err)
   455  	}
   456  }
   457  
   458  func TestPrepareBadSQLFailure(t *testing.T) {
   459  	t.Parallel()
   460  
   461  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   462  	defer closeConn(t, conn)
   463  
   464  	if _, err := conn.Prepare(context.Background(), "badSQL", "select foo"); err == nil {
   465  		t.Fatal("Prepare should have failed with syntax error")
   466  	}
   467  
   468  	ensureConnValid(t, conn)
   469  }
   470  
   471  func TestPrepareIdempotency(t *testing.T) {
   472  	t.Parallel()
   473  
   474  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   475  	defer closeConn(t, conn)
   476  
   477  	for i := 0; i < 2; i++ {
   478  		_, err := conn.Prepare(context.Background(), "test", "select 42::integer")
   479  		if err != nil {
   480  			t.Fatalf("%d. Unable to prepare statement: %v", i, err)
   481  		}
   482  
   483  		var n int32
   484  		err = conn.QueryRow(context.Background(), "test").Scan(&n)
   485  		if err != nil {
   486  			t.Errorf("%d. Executing prepared statement failed: %v", i, err)
   487  		}
   488  
   489  		if n != int32(42) {
   490  			t.Errorf("%d. Prepared statement did not return expected value: %v", i, n)
   491  		}
   492  	}
   493  
   494  	_, err := conn.Prepare(context.Background(), "test", "select 'fail'::varchar")
   495  	if err == nil {
   496  		t.Fatalf("Prepare statement with same name but different SQL should have failed but it didn't")
   497  		return
   498  	}
   499  }
   500  
   501  func TestPrepareStatementCacheModes(t *testing.T) {
   502  	t.Parallel()
   503  
   504  	config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
   505  
   506  	tests := []struct {
   507  		name                string
   508  		buildStatementCache pgx.BuildStatementCacheFunc
   509  	}{
   510  		{
   511  			name:                "disabled",
   512  			buildStatementCache: nil,
   513  		},
   514  		{
   515  			name: "prepare",
   516  			buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
   517  				return stmtcache.New(conn, stmtcache.ModePrepare, 32)
   518  			},
   519  		},
   520  		{
   521  			name: "describe",
   522  			buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
   523  				return stmtcache.New(conn, stmtcache.ModeDescribe, 32)
   524  			},
   525  		},
   526  	}
   527  
   528  	for _, tt := range tests {
   529  		t.Run(tt.name, func(t *testing.T) {
   530  			config.BuildStatementCache = tt.buildStatementCache
   531  			conn := mustConnect(t, config)
   532  			defer closeConn(t, conn)
   533  
   534  			_, err := conn.Prepare(context.Background(), "test", "select $1::text")
   535  			require.NoError(t, err)
   536  
   537  			var s string
   538  			err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
   539  			require.NoError(t, err)
   540  			require.Equal(t, "hello", s)
   541  		})
   542  	}
   543  }
   544  
   545  func TestListenNotify(t *testing.T) {
   546  	t.Parallel()
   547  
   548  	listener := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   549  	defer closeConn(t, listener)
   550  
   551  	if listener.PgConn().ParameterStatus("crdb_version") != "" {
   552  		t.Skip("Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
   553  	}
   554  
   555  	mustExec(t, listener, "listen chat")
   556  
   557  	notifier := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   558  	defer closeConn(t, notifier)
   559  
   560  	mustExec(t, notifier, "notify chat")
   561  
   562  	// when notification is waiting on the socket to be read
   563  	notification, err := listener.WaitForNotification(context.Background())
   564  	require.NoError(t, err)
   565  	assert.Equal(t, "chat", notification.Channel)
   566  
   567  	// when notification has already been read during previous query
   568  	mustExec(t, notifier, "notify chat")
   569  	rows, _ := listener.Query(context.Background(), "select 1")
   570  	rows.Close()
   571  	require.NoError(t, rows.Err())
   572  
   573  	ctx, cancelFn := context.WithCancel(context.Background())
   574  	cancelFn()
   575  	notification, err = listener.WaitForNotification(ctx)
   576  	require.NoError(t, err)
   577  	assert.Equal(t, "chat", notification.Channel)
   578  
   579  	// when timeout occurs
   580  	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
   581  	defer cancel()
   582  	notification, err = listener.WaitForNotification(ctx)
   583  	assert.True(t, pgconn.Timeout(err))
   584  
   585  	// listener can listen again after a timeout
   586  	mustExec(t, notifier, "notify chat")
   587  	notification, err = listener.WaitForNotification(context.Background())
   588  	require.NoError(t, err)
   589  	assert.Equal(t, "chat", notification.Channel)
   590  }
   591  
   592  func TestListenNotifyWhileBusyIsSafe(t *testing.T) {
   593  	t.Parallel()
   594  
   595  	func() {
   596  		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   597  		defer closeConn(t, conn)
   598  		skipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
   599  	}()
   600  
   601  	listenerDone := make(chan bool)
   602  	notifierDone := make(chan bool)
   603  	go func() {
   604  		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   605  		defer closeConn(t, conn)
   606  		defer func() {
   607  			listenerDone <- true
   608  		}()
   609  
   610  		mustExec(t, conn, "listen busysafe")
   611  
   612  		for i := 0; i < 5000; i++ {
   613  			var sum int32
   614  			var rowCount int32
   615  
   616  			rows, err := conn.Query(context.Background(), "select generate_series(1,$1)", 100)
   617  			if err != nil {
   618  				t.Errorf("conn.Query failed: %v", err)
   619  				return
   620  			}
   621  
   622  			for rows.Next() {
   623  				var n int32
   624  				if err := rows.Scan(&n); err != nil {
   625  					t.Errorf("Row scan failed: %v", err)
   626  					return
   627  				}
   628  				sum += n
   629  				rowCount++
   630  			}
   631  
   632  			if rows.Err() != nil {
   633  				t.Errorf("conn.Query failed: %v", err)
   634  				return
   635  			}
   636  
   637  			if sum != 5050 {
   638  				t.Errorf("Wrong rows sum: %v", sum)
   639  				return
   640  			}
   641  
   642  			if rowCount != 100 {
   643  				t.Errorf("Wrong number of rows: %v", rowCount)
   644  				return
   645  			}
   646  
   647  			time.Sleep(1 * time.Microsecond)
   648  		}
   649  	}()
   650  
   651  	go func() {
   652  		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   653  		defer closeConn(t, conn)
   654  		defer func() {
   655  			notifierDone <- true
   656  		}()
   657  
   658  		for i := 0; i < 100000; i++ {
   659  			mustExec(t, conn, "notify busysafe, 'hello'")
   660  			time.Sleep(1 * time.Microsecond)
   661  		}
   662  	}()
   663  
   664  	<-listenerDone
   665  	<-notifierDone
   666  }
   667  
   668  func TestListenNotifySelfNotification(t *testing.T) {
   669  	t.Parallel()
   670  
   671  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   672  	defer closeConn(t, conn)
   673  
   674  	skipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
   675  
   676  	mustExec(t, conn, "listen self")
   677  
   678  	// Notify self and WaitForNotification immediately
   679  	mustExec(t, conn, "notify self")
   680  
   681  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
   682  	defer cancel()
   683  	notification, err := conn.WaitForNotification(ctx)
   684  	require.NoError(t, err)
   685  	assert.Equal(t, "self", notification.Channel)
   686  
   687  	// Notify self and do something else before WaitForNotification
   688  	mustExec(t, conn, "notify self")
   689  
   690  	rows, _ := conn.Query(context.Background(), "select 1")
   691  	rows.Close()
   692  	if rows.Err() != nil {
   693  		t.Fatalf("Unexpected error on Query: %v", rows.Err())
   694  	}
   695  
   696  	ctx, cncl := context.WithTimeout(context.Background(), time.Second)
   697  	defer cncl()
   698  	notification, err = conn.WaitForNotification(ctx)
   699  	require.NoError(t, err)
   700  	assert.Equal(t, "self", notification.Channel)
   701  }
   702  
   703  func TestFatalRxError(t *testing.T) {
   704  	t.Parallel()
   705  
   706  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   707  	defer closeConn(t, conn)
   708  
   709  	skipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
   710  
   711  	var wg sync.WaitGroup
   712  	wg.Add(1)
   713  	go func() {
   714  		defer wg.Done()
   715  		var n int32
   716  		var s string
   717  		err := conn.QueryRow(context.Background(), "select 1::int4, pg_sleep(10)::varchar").Scan(&n, &s)
   718  		if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Severity == "FATAL" {
   719  		} else {
   720  			t.Errorf("Expected QueryRow Scan to return fatal PgError, but instead received %v", err)
   721  			return
   722  		}
   723  	}()
   724  
   725  	otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   726  	defer otherConn.Close(context.Background())
   727  
   728  	if _, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID()); err != nil {
   729  		t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
   730  	}
   731  
   732  	wg.Wait()
   733  
   734  	if !conn.IsClosed() {
   735  		t.Fatal("Connection should be closed")
   736  	}
   737  }
   738  
   739  func TestFatalTxError(t *testing.T) {
   740  	t.Parallel()
   741  
   742  	// Run timing sensitive test many times
   743  	for i := 0; i < 50; i++ {
   744  		func() {
   745  			conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   746  			defer closeConn(t, conn)
   747  
   748  			skipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
   749  
   750  			otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   751  			defer otherConn.Close(context.Background())
   752  
   753  			_, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID())
   754  			if err != nil {
   755  				t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
   756  			}
   757  
   758  			err = conn.QueryRow(context.Background(), "select 1").Scan(nil)
   759  			if err == nil {
   760  				t.Fatal("Expected error but none occurred")
   761  			}
   762  
   763  			if !conn.IsClosed() {
   764  				t.Fatalf("Connection should be closed but isn't. Previous Query err: %v", err)
   765  			}
   766  		}()
   767  	}
   768  }
   769  
   770  func TestInsertBoolArray(t *testing.T) {
   771  	t.Parallel()
   772  
   773  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   774  		if results := mustExec(t, conn, "create temporary table foo(spice bool[]);"); string(results) != "CREATE TABLE" {
   775  			t.Error("Unexpected results from Exec")
   776  		}
   777  
   778  		// Accept parameters
   779  		if results := mustExec(t, conn, "insert into foo(spice) values($1)", []bool{true, false, true}); string(results) != "INSERT 0 1" {
   780  			t.Errorf("Unexpected results from Exec: %v", results)
   781  		}
   782  	})
   783  }
   784  
   785  func TestInsertTimestampArray(t *testing.T) {
   786  	t.Parallel()
   787  
   788  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   789  		if results := mustExec(t, conn, "create temporary table foo(spice timestamp[]);"); string(results) != "CREATE TABLE" {
   790  			t.Error("Unexpected results from Exec")
   791  		}
   792  
   793  		// Accept parameters
   794  		if results := mustExec(t, conn, "insert into foo(spice) values($1)", []time.Time{time.Unix(1419143667, 0), time.Unix(1419143672, 0)}); string(results) != "INSERT 0 1" {
   795  			t.Errorf("Unexpected results from Exec: %v", results)
   796  		}
   797  	})
   798  }
   799  
   800  type testLog struct {
   801  	lvl  pgx.LogLevel
   802  	msg  string
   803  	data map[string]interface{}
   804  }
   805  
   806  type testLogger struct {
   807  	logs []testLog
   808  }
   809  
   810  func (l *testLogger) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
   811  	data["ctxdata"] = ctx.Value("ctxdata")
   812  	l.logs = append(l.logs, testLog{lvl: level, msg: msg, data: data})
   813  }
   814  
   815  func TestLogPassesContext(t *testing.T) {
   816  	t.Parallel()
   817  
   818  	l1 := &testLogger{}
   819  	config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
   820  	config.Logger = l1
   821  
   822  	conn := mustConnect(t, config)
   823  	defer closeConn(t, conn)
   824  
   825  	l1.logs = l1.logs[0:0] // Clear logs written when establishing connection
   826  
   827  	ctx := context.WithValue(context.Background(), "ctxdata", "foo")
   828  
   829  	if _, err := conn.Exec(ctx, ";"); err != nil {
   830  		t.Fatal(err)
   831  	}
   832  
   833  	if len(l1.logs) != 1 {
   834  		t.Fatal("Expected logger to be called once, but it wasn't")
   835  	}
   836  
   837  	if l1.logs[0].data["ctxdata"] != "foo" {
   838  		t.Fatal("Expected context data to be passed to logger, but it wasn't")
   839  	}
   840  }
   841  
   842  func TestLoggerFunc(t *testing.T) {
   843  	t.Parallel()
   844  
   845  	const testMsg = "foo"
   846  
   847  	buf := bytes.Buffer{}
   848  	logger := log.New(&buf, "", 0)
   849  
   850  	createAdapterFn := func(logger *log.Logger) pgx.LoggerFunc {
   851  		return func(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
   852  			logger.Printf("%s", testMsg)
   853  		}
   854  	}
   855  
   856  	config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
   857  	config.Logger = createAdapterFn(logger)
   858  
   859  	conn := mustConnect(t, config)
   860  	defer closeConn(t, conn)
   861  
   862  	buf.Reset() // Clear logs written when establishing connection
   863  
   864  	if _, err := conn.Exec(context.TODO(), ";"); err != nil {
   865  		t.Fatal(err)
   866  	}
   867  
   868  	if strings.TrimSpace(buf.String()) != testMsg {
   869  		t.Errorf("Expected logger function to return '%s', but it was '%s'", testMsg, buf.String())
   870  	}
   871  }
   872  
   873  func TestIdentifierSanitize(t *testing.T) {
   874  	t.Parallel()
   875  
   876  	tests := []struct {
   877  		ident    pgx.Identifier
   878  		expected string
   879  	}{
   880  		{
   881  			ident:    pgx.Identifier{`foo`},
   882  			expected: `"foo"`,
   883  		},
   884  		{
   885  			ident:    pgx.Identifier{`select`},
   886  			expected: `"select"`,
   887  		},
   888  		{
   889  			ident:    pgx.Identifier{`foo`, `bar`},
   890  			expected: `"foo"."bar"`,
   891  		},
   892  		{
   893  			ident:    pgx.Identifier{`you should " not do this`},
   894  			expected: `"you should "" not do this"`,
   895  		},
   896  		{
   897  			ident:    pgx.Identifier{`you should " not do this`, `please don't`},
   898  			expected: `"you should "" not do this"."please don't"`,
   899  		},
   900  		{
   901  			ident:    pgx.Identifier{`you should ` + string([]byte{0}) + `not do this`},
   902  			expected: `"you should not do this"`,
   903  		},
   904  	}
   905  
   906  	for i, tt := range tests {
   907  		qval := tt.ident.Sanitize()
   908  		if qval != tt.expected {
   909  			t.Errorf("%d. Expected Sanitize %v to return %v but it was %v", i, tt.ident, tt.expected, qval)
   910  		}
   911  	}
   912  }
   913  
   914  func TestConnInitConnInfo(t *testing.T) {
   915  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
   916  	defer closeConn(t, conn)
   917  
   918  	// spot check that the standard postgres type names aren't qualified
   919  	nameOIDs := map[string]uint32{
   920  		"_int8": pgtype.Int8ArrayOID,
   921  		"int8":  pgtype.Int8OID,
   922  		"json":  pgtype.JSONOID,
   923  		"text":  pgtype.TextOID,
   924  	}
   925  	for name, oid := range nameOIDs {
   926  		dtByName, ok := conn.ConnInfo().DataTypeForName(name)
   927  		if !ok {
   928  			t.Fatalf("Expected type named %v to be present", name)
   929  		}
   930  		dtByOID, ok := conn.ConnInfo().DataTypeForOID(oid)
   931  		if !ok {
   932  			t.Fatalf("Expected type OID %v to be present", oid)
   933  		}
   934  		if dtByName != dtByOID {
   935  			t.Fatalf("Expected type named %v to be the same as type OID %v", name, oid)
   936  		}
   937  	}
   938  
   939  	ensureConnValid(t, conn)
   940  }
   941  
   942  func TestUnregisteredTypeUsableAsStringArgumentAndBaseResult(t *testing.T) {
   943  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   944  		skipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
   945  
   946  		var n uint64
   947  		err := conn.QueryRow(context.Background(), "select $1::uint64", "42").Scan(&n)
   948  		if err != nil {
   949  			t.Fatal(err)
   950  		}
   951  
   952  		if n != 42 {
   953  			t.Fatalf("Expected n to be 42, but was %v", n)
   954  		}
   955  	})
   956  }
   957  
   958  func TestDomainType(t *testing.T) {
   959  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
   960  		skipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
   961  
   962  		var n uint64
   963  
   964  		// Domain type uint64 is a PostgreSQL domain of underlying type numeric.
   965  
   966  		err := conn.QueryRow(context.Background(), "select $1::uint64", uint64(24)).Scan(&n)
   967  		require.NoError(t, err)
   968  
   969  		// A string can be used. But a string cannot be the result because the describe result from the PostgreSQL server gives
   970  		// the underlying type of numeric.
   971  		err = conn.QueryRow(context.Background(), "select $1::uint64", "42").Scan(&n)
   972  		if err != nil {
   973  			t.Fatal(err)
   974  		}
   975  		if n != 42 {
   976  			t.Fatalf("Expected n to be 42, but was %v", n)
   977  		}
   978  
   979  		var uint64OID uint32
   980  		err = conn.QueryRow(context.Background(), "select t.oid from pg_type t where t.typname='uint64';").Scan(&uint64OID)
   981  		if err != nil {
   982  			t.Fatalf("did not find uint64 OID, %v", err)
   983  		}
   984  		conn.ConnInfo().RegisterDataType(pgtype.DataType{Value: &pgtype.Numeric{}, Name: "uint64", OID: uint64OID})
   985  
   986  		// String is still an acceptable argument after registration
   987  		err = conn.QueryRow(context.Background(), "select $1::uint64", "7").Scan(&n)
   988  		if err != nil {
   989  			t.Fatal(err)
   990  		}
   991  		if n != 7 {
   992  			t.Fatalf("Expected n to be 7, but was %v", n)
   993  		}
   994  
   995  		// But a uint64 is acceptable
   996  		err = conn.QueryRow(context.Background(), "select $1::uint64", uint64(24)).Scan(&n)
   997  		if err != nil {
   998  			t.Fatal(err)
   999  		}
  1000  		if n != 24 {
  1001  			t.Fatalf("Expected n to be 24, but was %v", n)
  1002  		}
  1003  	})
  1004  }
  1005  
  1006  func TestStmtCacheInvalidationConn(t *testing.T) {
  1007  	ctx := context.Background()
  1008  
  1009  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
  1010  	defer closeConn(t, conn)
  1011  
  1012  	// create a table and fill it with some data
  1013  	_, err := conn.Exec(ctx, `
  1014          DROP TABLE IF EXISTS drop_cols;
  1015          CREATE TABLE drop_cols (
  1016              id SERIAL PRIMARY KEY NOT NULL,
  1017              f1 int NOT NULL,
  1018              f2 int NOT NULL
  1019          );
  1020      `)
  1021  	require.NoError(t, err)
  1022  	_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
  1023  	require.NoError(t, err)
  1024  
  1025  	getSQL := "SELECT * FROM drop_cols WHERE id = $1"
  1026  
  1027  	// This query will populate the statement cache. We don't care about the result.
  1028  	rows, err := conn.Query(ctx, getSQL, 1)
  1029  	require.NoError(t, err)
  1030  	rows.Close()
  1031  
  1032  	// Now, change the schema of the table out from under the statement, making it invalid.
  1033  	_, err = conn.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
  1034  	require.NoError(t, err)
  1035  
  1036  	// We must get an error the first time we try to re-execute a bad statement.
  1037  	// It is up to the application to determine if it wants to try again. We punt to
  1038  	// the application because there is no clear recovery path in the case of failed transactions
  1039  	// or batch operations and because automatic retry is tricky and we don't want to get
  1040  	// it wrong at such an importaint layer of the stack.
  1041  	rows, err = conn.Query(ctx, getSQL, 1)
  1042  	require.NoError(t, err)
  1043  	rows.Next()
  1044  	nextErr := rows.Err()
  1045  	rows.Close()
  1046  	for _, err := range []error{nextErr, rows.Err()} {
  1047  		if err == nil {
  1048  			t.Fatal("expected InvalidCachedStatementPlanError: no error")
  1049  		}
  1050  		if !strings.Contains(err.Error(), "cached plan must not change result type") {
  1051  			t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error())
  1052  		}
  1053  	}
  1054  
  1055  	// On retry, the statement should have been flushed from the cache.
  1056  	rows, err = conn.Query(ctx, getSQL, 1)
  1057  	require.NoError(t, err)
  1058  	rows.Next()
  1059  	err = rows.Err()
  1060  	require.NoError(t, err)
  1061  	rows.Close()
  1062  	require.NoError(t, rows.Err())
  1063  
  1064  	ensureConnValid(t, conn)
  1065  }
  1066  
  1067  func TestStmtCacheInvalidationTx(t *testing.T) {
  1068  	ctx := context.Background()
  1069  
  1070  	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
  1071  	defer closeConn(t, conn)
  1072  
  1073  	// create a table and fill it with some data
  1074  	_, err := conn.Exec(ctx, `
  1075          DROP TABLE IF EXISTS drop_cols;
  1076          CREATE TABLE drop_cols (
  1077              id SERIAL PRIMARY KEY NOT NULL,
  1078              f1 int NOT NULL,
  1079              f2 int NOT NULL
  1080          );
  1081      `)
  1082  	require.NoError(t, err)
  1083  	_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
  1084  	require.NoError(t, err)
  1085  
  1086  	tx, err := conn.Begin(ctx)
  1087  	require.NoError(t, err)
  1088  
  1089  	getSQL := "SELECT * FROM drop_cols WHERE id = $1"
  1090  
  1091  	// This query will populate the statement cache. We don't care about the result.
  1092  	rows, err := tx.Query(ctx, getSQL, 1)
  1093  	require.NoError(t, err)
  1094  	rows.Close()
  1095  
  1096  	// Now, change the schema of the table out from under the statement, making it invalid.
  1097  	_, err = tx.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
  1098  	require.NoError(t, err)
  1099  
  1100  	// We must get an error the first time we try to re-execute a bad statement.
  1101  	// It is up to the application to determine if it wants to try again. We punt to
  1102  	// the application because there is no clear recovery path in the case of failed transactions
  1103  	// or batch operations and because automatic retry is tricky and we don't want to get
  1104  	// it wrong at such an importaint layer of the stack.
  1105  	rows, err = tx.Query(ctx, getSQL, 1)
  1106  	require.NoError(t, err)
  1107  	rows.Next()
  1108  	nextErr := rows.Err()
  1109  	rows.Close()
  1110  	for _, err := range []error{nextErr, rows.Err()} {
  1111  		if err == nil {
  1112  			t.Fatal("expected InvalidCachedStatementPlanError: no error")
  1113  		}
  1114  		if !strings.Contains(err.Error(), "cached plan must not change result type") {
  1115  			t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error())
  1116  		}
  1117  	}
  1118  
  1119  	rows, err = tx.Query(ctx, getSQL, 1)
  1120  	require.NoError(t, err) // error does not pop up immediately
  1121  	rows.Next()
  1122  	err = rows.Err()
  1123  	// Retries within the same transaction are errors (really anything except a rollbakc
  1124  	// will be an error in this transaction).
  1125  	require.Error(t, err)
  1126  	rows.Close()
  1127  
  1128  	err = tx.Rollback(ctx)
  1129  	require.NoError(t, err)
  1130  
  1131  	// once we've rolled back, retries will work
  1132  	rows, err = conn.Query(ctx, getSQL, 1)
  1133  	require.NoError(t, err)
  1134  	rows.Next()
  1135  	err = rows.Err()
  1136  	require.NoError(t, err)
  1137  	rows.Close()
  1138  
  1139  	ensureConnValid(t, conn)
  1140  }
  1141  
  1142  func TestInsertDurationInterval(t *testing.T) {
  1143  	testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
  1144  		_, err := conn.Exec(context.Background(), "create temporary table t(duration INTERVAL(0) NOT NULL)")
  1145  		require.NoError(t, err)
  1146  
  1147  		result, err := conn.Exec(context.Background(), "insert into t(duration) values($1)", time.Minute)
  1148  		require.NoError(t, err)
  1149  
  1150  		n := result.RowsAffected()
  1151  		require.EqualValues(t, 1, n)
  1152  	})
  1153  }
  1154  

View as plain text