1 package pgx_test
2
3 import (
4 "bytes"
5 "context"
6 "os"
7 "strings"
8 "sync"
9 "testing"
10 "time"
11
12 "github.com/jackc/pgx/v5"
13 "github.com/jackc/pgx/v5/pgconn"
14 "github.com/jackc/pgx/v5/pgtype"
15 "github.com/jackc/pgx/v5/pgxtest"
16 "github.com/stretchr/testify/assert"
17 "github.com/stretchr/testify/require"
18 )
19
20 func TestCrateDBConnect(t *testing.T) {
21 t.Parallel()
22
23 connString := os.Getenv("PGX_TEST_CRATEDB_CONN_STRING")
24 if connString == "" {
25 t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_CRATEDB_CONN_STRING")
26 }
27
28 conn, err := pgx.Connect(context.Background(), connString)
29 require.Nil(t, err)
30 defer closeConn(t, conn)
31
32 assert.Equal(t, connString, conn.Config().ConnString())
33
34 var result int
35 err = conn.QueryRow(context.Background(), "select 1 +1").Scan(&result)
36 if err != nil {
37 t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
38 }
39 if result != 2 {
40 t.Errorf("bad result: %d", result)
41 }
42 }
43
44 func TestConnect(t *testing.T) {
45 t.Parallel()
46
47 connString := os.Getenv("PGX_TEST_DATABASE")
48 config := mustParseConfig(t, connString)
49
50 conn, err := pgx.ConnectConfig(context.Background(), config)
51 if err != nil {
52 t.Fatalf("Unable to establish connection: %v", err)
53 }
54
55 assertConfigsEqual(t, config, conn.Config(), "Conn.Config() returns original config")
56
57 var currentDB string
58 err = conn.QueryRow(context.Background(), "select current_database()").Scan(¤tDB)
59 if err != nil {
60 t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
61 }
62 if currentDB != config.Config.Database {
63 t.Errorf("Did not connect to specified database (%v)", config.Config.Database)
64 }
65
66 var user string
67 err = conn.QueryRow(context.Background(), "select current_user").Scan(&user)
68 if err != nil {
69 t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
70 }
71 if user != config.Config.User {
72 t.Errorf("Did not connect as specified user (%v)", config.Config.User)
73 }
74
75 err = conn.Close(context.Background())
76 if err != nil {
77 t.Fatal("Unable to close connection")
78 }
79 }
80
81 func TestConnectWithPreferSimpleProtocol(t *testing.T) {
82 t.Parallel()
83
84 connConfig := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
85 connConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
86
87 conn := mustConnect(t, connConfig)
88 defer closeConn(t, conn)
89
90
91
92
93 var s pgtype.Text
94 err := conn.QueryRow(context.Background(), "select $1::int4", 42).Scan(&s)
95 require.NoError(t, err)
96 require.Equal(t, pgtype.Text{String: "42", Valid: true}, s)
97
98 ensureConnValid(t, conn)
99 }
100
101 func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
102 config := &pgx.ConnConfig{}
103 require.PanicsWithValue(t, "config must be created by ParseConfig", func() {
104 pgx.ConnectConfig(context.Background(), config)
105 })
106 }
107
108 func TestConfigContainsConnStr(t *testing.T) {
109 connStr := os.Getenv("PGX_TEST_DATABASE")
110 config, err := pgx.ParseConfig(connStr)
111 require.NoError(t, err)
112 assert.Equal(t, connStr, config.ConnString())
113 }
114
115 func TestConfigCopyReturnsEqualConfig(t *testing.T) {
116 connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
117 original, err := pgx.ParseConfig(connString)
118 require.NoError(t, err)
119
120 copied := original.Copy()
121 assertConfigsEqual(t, original, copied, t.Name())
122 }
123
124 func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
125 connString := os.Getenv("PGX_TEST_DATABASE")
126 original, err := pgx.ParseConfig(connString)
127 require.NoError(t, err)
128
129 copied := original.Copy()
130 assert.NotPanics(t, func() {
131 _, err = pgx.ConnectConfig(context.Background(), copied)
132 })
133 assert.NoError(t, err)
134 }
135
136 func TestParseConfigExtractsStatementCacheOptions(t *testing.T) {
137 t.Parallel()
138
139 config, err := pgx.ParseConfig("statement_cache_capacity=0")
140 require.NoError(t, err)
141 require.EqualValues(t, 0, config.StatementCacheCapacity)
142
143 config, err = pgx.ParseConfig("statement_cache_capacity=42")
144 require.NoError(t, err)
145 require.EqualValues(t, 42, config.StatementCacheCapacity)
146
147 config, err = pgx.ParseConfig("description_cache_capacity=0")
148 require.NoError(t, err)
149 require.EqualValues(t, 0, config.DescriptionCacheCapacity)
150
151 config, err = pgx.ParseConfig("description_cache_capacity=42")
152 require.NoError(t, err)
153 require.EqualValues(t, 42, config.DescriptionCacheCapacity)
154
155
156
157
158 config, err = pgx.ParseConfig("default_query_exec_mode=cache_statement")
159 require.NoError(t, err)
160 require.Equal(t, pgx.QueryExecModeCacheStatement, config.DefaultQueryExecMode)
161
162 config, err = pgx.ParseConfig("default_query_exec_mode=cache_describe")
163 require.NoError(t, err)
164 require.Equal(t, pgx.QueryExecModeCacheDescribe, config.DefaultQueryExecMode)
165
166 config, err = pgx.ParseConfig("default_query_exec_mode=describe_exec")
167 require.NoError(t, err)
168 require.Equal(t, pgx.QueryExecModeDescribeExec, config.DefaultQueryExecMode)
169
170 config, err = pgx.ParseConfig("default_query_exec_mode=exec")
171 require.NoError(t, err)
172 require.Equal(t, pgx.QueryExecModeExec, config.DefaultQueryExecMode)
173
174 config, err = pgx.ParseConfig("default_query_exec_mode=simple_protocol")
175 require.NoError(t, err)
176 require.Equal(t, pgx.QueryExecModeSimpleProtocol, config.DefaultQueryExecMode)
177 }
178
179 func TestParseConfigExtractsDefaultQueryExecMode(t *testing.T) {
180 t.Parallel()
181
182 for _, tt := range []struct {
183 connString string
184 defaultQueryExecMode pgx.QueryExecMode
185 }{
186 {"", pgx.QueryExecModeCacheStatement},
187 {"default_query_exec_mode=cache_statement", pgx.QueryExecModeCacheStatement},
188 {"default_query_exec_mode=cache_describe", pgx.QueryExecModeCacheDescribe},
189 {"default_query_exec_mode=describe_exec", pgx.QueryExecModeDescribeExec},
190 {"default_query_exec_mode=exec", pgx.QueryExecModeExec},
191 {"default_query_exec_mode=simple_protocol", pgx.QueryExecModeSimpleProtocol},
192 } {
193 config, err := pgx.ParseConfig(tt.connString)
194 require.NoError(t, err)
195 require.Equalf(t, tt.defaultQueryExecMode, config.DefaultQueryExecMode, "connString: `%s`", tt.connString)
196 require.Empty(t, config.RuntimeParams["default_query_exec_mode"])
197 }
198 }
199
200 func TestParseConfigErrors(t *testing.T) {
201 t.Parallel()
202
203 for _, tt := range []struct {
204 connString string
205 expectedErrSubstring string
206 }{
207 {"default_query_exec_mode=does_not_exist", "does_not_exist"},
208 } {
209 config, err := pgx.ParseConfig(tt.connString)
210 require.Nil(t, config)
211 require.ErrorContains(t, err, tt.expectedErrSubstring)
212 }
213 }
214
215 func TestExec(t *testing.T) {
216 t.Parallel()
217
218 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
219 defer cancel()
220
221 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
222 if results := mustExec(t, conn, "create temporary table foo(id integer primary key);"); results.String() != "CREATE TABLE" {
223 t.Error("Unexpected results from Exec")
224 }
225
226
227 if results := mustExec(t, conn, "insert into foo(id) values($1)", 1); results.String() != "INSERT 0 1" {
228 t.Errorf("Unexpected results from Exec: %v", results)
229 }
230
231 if results := mustExec(t, conn, "drop table foo;"); results.String() != "DROP TABLE" {
232 t.Error("Unexpected results from Exec")
233 }
234
235
236 if results := mustExec(t, conn, "create temporary table foo(id serial primary key); drop table foo;"); results.String() != "DROP TABLE" {
237 t.Error("Unexpected results from Exec")
238 }
239
240
241 if results := mustExec(t, conn, strings.Repeat("select 42; ", 1000)); results.String() != "SELECT 1" {
242 t.Errorf("Unexpected results from Exec: %v", results)
243 }
244
245
246 if results := mustExec(t, conn, "--;"); results.String() != "" {
247 t.Errorf("Unexpected results from Exec: %v", results)
248 }
249 })
250 }
251
252 type testQueryRewriter struct {
253 sql string
254 args []any
255 }
256
257 func (qr *testQueryRewriter) RewriteQuery(ctx context.Context, conn *pgx.Conn, sql string, args []any) (newSQL string, newArgs []any, err error) {
258 return qr.sql, qr.args, nil
259 }
260
261 func TestExecWithQueryRewriter(t *testing.T) {
262 t.Parallel()
263
264 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
265 defer cancel()
266
267 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
268 qr := testQueryRewriter{sql: "select $1::int", args: []any{42}}
269 _, err := conn.Exec(ctx, "should be replaced", &qr)
270 require.NoError(t, err)
271 })
272 }
273
274 func TestExecFailure(t *testing.T) {
275 t.Parallel()
276
277 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
278 defer cancel()
279
280 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
281 if _, err := conn.Exec(context.Background(), "selct;"); err == nil {
282 t.Fatal("Expected SQL syntax error")
283 }
284
285 rows, _ := conn.Query(context.Background(), "select 1")
286 rows.Close()
287 if rows.Err() != nil {
288 t.Fatalf("Exec failure appears to have broken connection: %v", rows.Err())
289 }
290 })
291 }
292
293 func TestExecFailureWithArguments(t *testing.T) {
294 t.Parallel()
295
296 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
297 defer cancel()
298
299 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
300 _, err := conn.Exec(context.Background(), "selct $1;", 1)
301 if err == nil {
302 t.Fatal("Expected SQL syntax error")
303 }
304 assert.False(t, pgconn.SafeToRetry(err))
305
306 _, err = conn.Exec(context.Background(), "select $1::varchar(1);", "1", "2")
307 require.Error(t, err)
308 })
309 }
310
311 func TestExecContextWithoutCancelation(t *testing.T) {
312 t.Parallel()
313
314 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
315 defer cancel()
316
317 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
318 ctx, cancelFunc := context.WithCancel(ctx)
319 defer cancelFunc()
320
321 commandTag, err := conn.Exec(ctx, "create temporary table foo(id integer primary key);")
322 if err != nil {
323 t.Fatal(err)
324 }
325 if commandTag.String() != "CREATE TABLE" {
326 t.Fatalf("Unexpected results from Exec: %v", commandTag)
327 }
328 assert.False(t, pgconn.SafeToRetry(err))
329 })
330 }
331
332 func TestExecContextFailureWithoutCancelation(t *testing.T) {
333 t.Parallel()
334
335 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
336 defer cancel()
337
338 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
339 ctx, cancelFunc := context.WithCancel(ctx)
340 defer cancelFunc()
341
342 _, err := conn.Exec(ctx, "selct;")
343 if err == nil {
344 t.Fatal("Expected SQL syntax error")
345 }
346 assert.False(t, pgconn.SafeToRetry(err))
347
348 rows, _ := conn.Query(context.Background(), "select 1")
349 rows.Close()
350 if rows.Err() != nil {
351 t.Fatalf("ExecEx failure appears to have broken connection: %v", rows.Err())
352 }
353 assert.False(t, pgconn.SafeToRetry(err))
354 })
355 }
356
357 func TestExecContextFailureWithoutCancelationWithArguments(t *testing.T) {
358 t.Parallel()
359
360 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
361 defer cancel()
362
363 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
364 ctx, cancelFunc := context.WithCancel(ctx)
365 defer cancelFunc()
366
367 _, err := conn.Exec(ctx, "selct $1;", 1)
368 if err == nil {
369 t.Fatal("Expected SQL syntax error")
370 }
371 assert.False(t, pgconn.SafeToRetry(err))
372 })
373 }
374
375 func TestExecFailureCloseBefore(t *testing.T) {
376 t.Parallel()
377
378 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
379 closeConn(t, conn)
380
381 _, err := conn.Exec(context.Background(), "select 1")
382 require.Error(t, err)
383 assert.True(t, pgconn.SafeToRetry(err))
384 }
385
386 func TestExecPerQuerySimpleProtocol(t *testing.T) {
387 t.Parallel()
388
389 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
390 defer closeConn(t, conn)
391
392 ctx, cancelFunc := context.WithCancel(context.Background())
393 defer cancelFunc()
394
395 commandTag, err := conn.Exec(ctx, "create temporary table foo(name varchar primary key);")
396 if err != nil {
397 t.Fatal(err)
398 }
399 if commandTag.String() != "CREATE TABLE" {
400 t.Fatalf("Unexpected results from Exec: %v", commandTag)
401 }
402
403 commandTag, err = conn.Exec(ctx,
404 "insert into foo(name) values($1);",
405 pgx.QueryExecModeSimpleProtocol,
406 "bar'; drop table foo;--",
407 )
408 if err != nil {
409 t.Fatal(err)
410 }
411 if commandTag.String() != "INSERT 0 1" {
412 t.Fatalf("Unexpected results from Exec: %v", commandTag)
413 }
414
415 }
416
417 func TestPrepare(t *testing.T) {
418 t.Parallel()
419
420 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
421 defer closeConn(t, conn)
422
423 _, err := conn.Prepare(context.Background(), "test", "select $1::varchar")
424 if err != nil {
425 t.Errorf("Unable to prepare statement: %v", err)
426 return
427 }
428
429 var s string
430 err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
431 if err != nil {
432 t.Errorf("Executing prepared statement failed: %v", err)
433 }
434
435 if s != "hello" {
436 t.Errorf("Prepared statement did not return expected value: %v", s)
437 }
438
439 err = conn.Deallocate(context.Background(), "test")
440 if err != nil {
441 t.Errorf("conn.Deallocate failed: %v", err)
442 }
443
444
445
446
447 _, err = conn.Prepare(context.Background(), "test", "select $1::integer")
448 if err != nil {
449 t.Errorf("Unable to prepare statement: %v", err)
450 return
451 }
452
453 var n int32
454 err = conn.QueryRow(context.Background(), "test", int32(1)).Scan(&n)
455 if err != nil {
456 t.Errorf("Executing prepared statement failed: %v", err)
457 }
458
459 if n != 1 {
460 t.Errorf("Prepared statement did not return expected value: %v", s)
461 }
462
463 err = conn.DeallocateAll(context.Background())
464 if err != nil {
465 t.Errorf("conn.Deallocate failed: %v", err)
466 }
467 }
468
469 func TestPrepareBadSQLFailure(t *testing.T) {
470 t.Parallel()
471
472 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
473 defer closeConn(t, conn)
474
475 if _, err := conn.Prepare(context.Background(), "badSQL", "select foo"); err == nil {
476 t.Fatal("Prepare should have failed with syntax error")
477 }
478
479 ensureConnValid(t, conn)
480 }
481
482 func TestPrepareIdempotency(t *testing.T) {
483 t.Parallel()
484
485 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
486 defer cancel()
487
488 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
489 for i := 0; i < 2; i++ {
490 _, err := conn.Prepare(context.Background(), "test", "select 42::integer")
491 if err != nil {
492 t.Fatalf("%d. Unable to prepare statement: %v", i, err)
493 }
494
495 var n int32
496 err = conn.QueryRow(context.Background(), "test").Scan(&n)
497 if err != nil {
498 t.Errorf("%d. Executing prepared statement failed: %v", i, err)
499 }
500
501 if n != int32(42) {
502 t.Errorf("%d. Prepared statement did not return expected value: %v", i, n)
503 }
504 }
505
506 _, err := conn.Prepare(context.Background(), "test", "select 'fail'::varchar")
507 if err == nil {
508 t.Fatalf("Prepare statement with same name but different SQL should have failed but it didn't")
509 return
510 }
511 })
512 }
513
514 func TestPrepareStatementCacheModes(t *testing.T) {
515 t.Parallel()
516
517 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
518 defer cancel()
519
520 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
521 _, err := conn.Prepare(context.Background(), "test", "select $1::text")
522 require.NoError(t, err)
523
524 var s string
525 err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
526 require.NoError(t, err)
527 require.Equal(t, "hello", s)
528 })
529 }
530
531 func TestPrepareWithDigestedName(t *testing.T) {
532 t.Parallel()
533
534 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
535 defer cancel()
536
537 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
538 sql := "select $1::text"
539 sd, err := conn.Prepare(ctx, sql, sql)
540 require.NoError(t, err)
541 require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)
542
543 var s string
544 err = conn.QueryRow(ctx, sql, "hello").Scan(&s)
545 require.NoError(t, err)
546 require.Equal(t, "hello", s)
547
548 err = conn.Deallocate(ctx, sql)
549 require.NoError(t, err)
550 })
551 }
552
553
554 func TestDeallocateInAbortedTransaction(t *testing.T) {
555 t.Parallel()
556
557 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
558 defer cancel()
559
560 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
561 tx, err := conn.Begin(ctx)
562 require.NoError(t, err)
563
564 sql := "select $1::text"
565 sd, err := tx.Prepare(ctx, sql, sql)
566 require.NoError(t, err)
567 require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)
568
569 var s string
570 err = tx.QueryRow(ctx, sql, "hello").Scan(&s)
571 require.NoError(t, err)
572 require.Equal(t, "hello", s)
573
574 _, err = tx.Exec(ctx, "select 1/0")
575 require.Error(t, err)
576
577 err = conn.Deallocate(ctx, sql)
578 require.NoError(t, err)
579
580 err = tx.Rollback(ctx)
581 require.NoError(t, err)
582
583 sd, err = conn.Prepare(ctx, sql, sql)
584 require.NoError(t, err)
585 require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)
586 })
587 }
588
589 func TestDeallocateMissingPreparedStatementStillClearsFromPreparedStatementMap(t *testing.T) {
590 t.Parallel()
591
592 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
593 defer cancel()
594
595 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
596 _, err := conn.Prepare(ctx, "ps", "select $1::text")
597 require.NoError(t, err)
598
599 _, err = conn.Exec(ctx, "deallocate ps")
600 require.NoError(t, err)
601
602 err = conn.Deallocate(ctx, "ps")
603 require.NoError(t, err)
604
605 _, err = conn.Prepare(ctx, "ps", "select $1::text, $2::text")
606 require.NoError(t, err)
607
608 var s1, s2 string
609 err = conn.QueryRow(ctx, "ps", "hello", "world").Scan(&s1, &s2)
610 require.NoError(t, err)
611 require.Equal(t, "hello", s1)
612 require.Equal(t, "world", s2)
613 })
614 }
615
616 func TestListenNotify(t *testing.T) {
617 t.Parallel()
618
619 listener := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
620 defer closeConn(t, listener)
621
622 if listener.PgConn().ParameterStatus("crdb_version") != "" {
623 t.Skip("Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
624 }
625
626 mustExec(t, listener, "listen chat")
627
628 notifier := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
629 defer closeConn(t, notifier)
630
631 mustExec(t, notifier, "notify chat")
632
633
634 notification, err := listener.WaitForNotification(context.Background())
635 require.NoError(t, err)
636 assert.Equal(t, "chat", notification.Channel)
637
638
639 mustExec(t, notifier, "notify chat")
640 rows, _ := listener.Query(context.Background(), "select 1")
641 rows.Close()
642 require.NoError(t, rows.Err())
643
644 ctx, cancelFn := context.WithCancel(context.Background())
645 cancelFn()
646 notification, err = listener.WaitForNotification(ctx)
647 require.NoError(t, err)
648 assert.Equal(t, "chat", notification.Channel)
649
650
651 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
652 defer cancel()
653 notification, err = listener.WaitForNotification(ctx)
654 assert.True(t, pgconn.Timeout(err))
655 assert.Nil(t, notification)
656
657
658 mustExec(t, notifier, "notify chat")
659 notification, err = listener.WaitForNotification(context.Background())
660 require.NoError(t, err)
661 assert.Equal(t, "chat", notification.Channel)
662 }
663
664 func TestListenNotifyWhileBusyIsSafe(t *testing.T) {
665 t.Parallel()
666
667 func() {
668 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
669 defer closeConn(t, conn)
670 pgxtest.SkipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
671 }()
672
673 listenerDone := make(chan bool)
674 notifierDone := make(chan bool)
675 listening := make(chan bool)
676 go func() {
677 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
678 defer closeConn(t, conn)
679 defer func() {
680 listenerDone <- true
681 }()
682
683 mustExec(t, conn, "listen busysafe")
684 listening <- true
685
686 for i := 0; i < 5000; i++ {
687 var sum int32
688 var rowCount int32
689
690 rows, err := conn.Query(context.Background(), "select generate_series(1,$1)", 100)
691 if err != nil {
692 t.Errorf("conn.Query failed: %v", err)
693 return
694 }
695
696 for rows.Next() {
697 var n int32
698 if err := rows.Scan(&n); err != nil {
699 t.Errorf("Row scan failed: %v", err)
700 return
701 }
702 sum += n
703 rowCount++
704 }
705
706 if rows.Err() != nil {
707 t.Errorf("conn.Query failed: %v", rows.Err())
708 return
709 }
710
711 if sum != 5050 {
712 t.Errorf("Wrong rows sum: %v", sum)
713 return
714 }
715
716 if rowCount != 100 {
717 t.Errorf("Wrong number of rows: %v", rowCount)
718 return
719 }
720 }
721 }()
722
723 go func() {
724 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
725 defer closeConn(t, conn)
726 defer func() {
727 notifierDone <- true
728 }()
729
730 <-listening
731
732 for i := 0; i < 100000; i++ {
733 mustExec(t, conn, "notify busysafe, 'hello'")
734 }
735 }()
736
737 <-listenerDone
738 <-notifierDone
739 }
740
741 func TestListenNotifySelfNotification(t *testing.T) {
742 t.Parallel()
743
744 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
745 defer closeConn(t, conn)
746
747 pgxtest.SkipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
748
749 mustExec(t, conn, "listen self")
750
751
752 mustExec(t, conn, "notify self")
753
754 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
755 defer cancel()
756 notification, err := conn.WaitForNotification(ctx)
757 require.NoError(t, err)
758 assert.Equal(t, "self", notification.Channel)
759
760
761 mustExec(t, conn, "notify self")
762
763 rows, _ := conn.Query(context.Background(), "select 1")
764 rows.Close()
765 if rows.Err() != nil {
766 t.Fatalf("Unexpected error on Query: %v", rows.Err())
767 }
768
769 ctx, cncl := context.WithTimeout(context.Background(), time.Second)
770 defer cncl()
771 notification, err = conn.WaitForNotification(ctx)
772 require.NoError(t, err)
773 assert.Equal(t, "self", notification.Channel)
774 }
775
776 func TestFatalRxError(t *testing.T) {
777 t.Parallel()
778
779 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
780 defer closeConn(t, conn)
781
782 pgxtest.SkipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
783
784 var wg sync.WaitGroup
785 wg.Add(1)
786 go func() {
787 defer wg.Done()
788 var n int32
789 var s string
790 err := conn.QueryRow(context.Background(), "select 1::int4, pg_sleep(10)::varchar").Scan(&n, &s)
791 if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Severity == "FATAL" {
792 } else {
793 t.Errorf("Expected QueryRow Scan to return fatal PgError, but instead received %v", err)
794 return
795 }
796 }()
797
798 otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
799 defer otherConn.Close(context.Background())
800
801 if _, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID()); err != nil {
802 t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
803 }
804
805 wg.Wait()
806
807 if !conn.IsClosed() {
808 t.Fatal("Connection should be closed")
809 }
810 }
811
812 func TestFatalTxError(t *testing.T) {
813 t.Parallel()
814
815
816 for i := 0; i < 50; i++ {
817 func() {
818 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
819 defer closeConn(t, conn)
820
821 pgxtest.SkipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
822
823 otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
824 defer otherConn.Close(context.Background())
825
826 _, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID())
827 if err != nil {
828 t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
829 }
830
831 err = conn.QueryRow(context.Background(), "select 1").Scan(nil)
832 if err == nil {
833 t.Fatal("Expected error but none occurred")
834 }
835
836 if !conn.IsClosed() {
837 t.Fatalf("Connection should be closed but isn't. Previous Query err: %v", err)
838 }
839 }()
840 }
841 }
842
843 func TestInsertBoolArray(t *testing.T) {
844 t.Parallel()
845
846 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
847 defer cancel()
848
849 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
850 if results := mustExec(t, conn, "create temporary table foo(spice bool[]);"); results.String() != "CREATE TABLE" {
851 t.Error("Unexpected results from Exec")
852 }
853
854
855 if results := mustExec(t, conn, "insert into foo(spice) values($1)", []bool{true, false, true}); results.String() != "INSERT 0 1" {
856 t.Errorf("Unexpected results from Exec: %v", results)
857 }
858 })
859 }
860
861 func TestInsertTimestampArray(t *testing.T) {
862 t.Parallel()
863
864 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
865 defer cancel()
866
867 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
868 if results := mustExec(t, conn, "create temporary table foo(spice timestamp[]);"); results.String() != "CREATE TABLE" {
869 t.Error("Unexpected results from Exec")
870 }
871
872
873 if results := mustExec(t, conn, "insert into foo(spice) values($1)", []time.Time{time.Unix(1419143667, 0), time.Unix(1419143672, 0)}); results.String() != "INSERT 0 1" {
874 t.Errorf("Unexpected results from Exec: %v", results)
875 }
876 })
877 }
878
879 func TestIdentifierSanitize(t *testing.T) {
880 t.Parallel()
881
882 tests := []struct {
883 ident pgx.Identifier
884 expected string
885 }{
886 {
887 ident: pgx.Identifier{`foo`},
888 expected: `"foo"`,
889 },
890 {
891 ident: pgx.Identifier{`select`},
892 expected: `"select"`,
893 },
894 {
895 ident: pgx.Identifier{`foo`, `bar`},
896 expected: `"foo"."bar"`,
897 },
898 {
899 ident: pgx.Identifier{`you should " not do this`},
900 expected: `"you should "" not do this"`,
901 },
902 {
903 ident: pgx.Identifier{`you should " not do this`, `please don't`},
904 expected: `"you should "" not do this"."please don't"`,
905 },
906 {
907 ident: pgx.Identifier{`you should ` + string([]byte{0}) + `not do this`},
908 expected: `"you should not do this"`,
909 },
910 }
911
912 for i, tt := range tests {
913 qval := tt.ident.Sanitize()
914 if qval != tt.expected {
915 t.Errorf("%d. Expected Sanitize %v to return %v but it was %v", i, tt.ident, tt.expected, qval)
916 }
917 }
918 }
919
920 func TestConnInitTypeMap(t *testing.T) {
921 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
922 defer closeConn(t, conn)
923
924
925 nameOIDs := map[string]uint32{
926 "_int8": pgtype.Int8ArrayOID,
927 "int8": pgtype.Int8OID,
928 "json": pgtype.JSONOID,
929 "text": pgtype.TextOID,
930 }
931 for name, oid := range nameOIDs {
932 dtByName, ok := conn.TypeMap().TypeForName(name)
933 if !ok {
934 t.Fatalf("Expected type named %v to be present", name)
935 }
936 dtByOID, ok := conn.TypeMap().TypeForOID(oid)
937 if !ok {
938 t.Fatalf("Expected type OID %v to be present", oid)
939 }
940 if dtByName != dtByOID {
941 t.Fatalf("Expected type named %v to be the same as type OID %v", name, oid)
942 }
943 }
944
945 ensureConnValid(t, conn)
946 }
947
948 func TestUnregisteredTypeUsableAsStringArgumentAndBaseResult(t *testing.T) {
949 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
950 defer cancel()
951
952 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
953 pgxtest.SkipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
954
955 var n uint64
956 err := conn.QueryRow(context.Background(), "select $1::uint64", "42").Scan(&n)
957 if err != nil {
958 t.Fatal(err)
959 }
960
961 if n != 42 {
962 t.Fatalf("Expected n to be 42, but was %v", n)
963 }
964 })
965 }
966
967 func TestDomainType(t *testing.T) {
968 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
969 defer cancel()
970
971 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
972 pgxtest.SkipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
973
974
975
976
977
978
979 var s string
980 err := conn.QueryRow(ctx, "select $1::uint64", "24").Scan(&s)
981 require.NoError(t, err)
982 require.Equal(t, "24", s)
983
984
985 uint64Type, err := conn.LoadType(ctx, "uint64")
986 require.NoError(t, err)
987 conn.TypeMap().RegisterType(uint64Type)
988
989 var n uint64
990 err = conn.QueryRow(ctx, "select $1::uint64", uint64(24)).Scan(&n)
991 require.NoError(t, err)
992
993
994 err = conn.QueryRow(ctx, "select $1::uint64", "7").Scan(&n)
995 if err != nil {
996 t.Fatal(err)
997 }
998 if n != 7 {
999 t.Fatalf("Expected n to be 7, but was %v", n)
1000 }
1001 })
1002 }
1003
1004 func TestLoadTypeSameNameInDifferentSchemas(t *testing.T) {
1005 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1006 defer cancel()
1007
1008 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1009 pgxtest.SkipCockroachDB(t, conn, "Server does support composite types (https://github.com/cockroachdb/cockroach/issues/27792)")
1010
1011 tx, err := conn.Begin(ctx)
1012 require.NoError(t, err)
1013 defer tx.Rollback(ctx)
1014
1015 _, err = tx.Exec(ctx, `create schema pgx_a;
1016 create type pgx_a.point as (a text, b text);
1017 create schema pgx_b;
1018 create type pgx_b.point as (c text);
1019 `)
1020 require.NoError(t, err)
1021
1022
1023 for _, typename := range []string{"pgx_a.point", "pgx_b.point"} {
1024
1025
1026 dt, err := conn.LoadType(ctx, typename)
1027 require.NoError(t, err)
1028 conn.TypeMap().RegisterType(dt)
1029 }
1030
1031 type aPoint struct {
1032 A string
1033 B string
1034 }
1035
1036 type bPoint struct {
1037 C string
1038 }
1039
1040 var a aPoint
1041 var b bPoint
1042 err = tx.QueryRow(ctx, `select '(foo,bar)'::pgx_a.point, '(baz)'::pgx_b.point`).Scan(&a, &b)
1043 require.NoError(t, err)
1044 require.Equal(t, aPoint{"foo", "bar"}, a)
1045 require.Equal(t, bPoint{"baz"}, b)
1046 })
1047 }
1048
1049 func TestLoadCompositeType(t *testing.T) {
1050 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1051 defer cancel()
1052
1053 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1054 pgxtest.SkipCockroachDB(t, conn, "Server does support composite types (https://github.com/cockroachdb/cockroach/issues/27792)")
1055
1056 tx, err := conn.Begin(ctx)
1057 require.NoError(t, err)
1058 defer tx.Rollback(ctx)
1059
1060 _, err = tx.Exec(ctx, "create type compositetype as (attr1 int, attr2 int)")
1061 require.NoError(t, err)
1062
1063 _, err = tx.Exec(ctx, "alter type compositetype drop attribute attr1")
1064 require.NoError(t, err)
1065
1066 _, err = conn.LoadType(ctx, "compositetype")
1067 require.NoError(t, err)
1068 })
1069 }
1070
1071 func TestLoadRangeType(t *testing.T) {
1072 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1073 defer cancel()
1074
1075 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1076 pgxtest.SkipCockroachDB(t, conn, "Server does support range types")
1077
1078 tx, err := conn.Begin(ctx)
1079 require.NoError(t, err)
1080 defer tx.Rollback(ctx)
1081
1082 _, err = tx.Exec(ctx, "create type examplefloatrange as range (subtype=float8, subtype_diff=float8mi)")
1083 require.NoError(t, err)
1084
1085
1086 newRangeType, err := conn.LoadType(ctx, "examplefloatrange")
1087 require.NoError(t, err)
1088 conn.TypeMap().RegisterType(newRangeType)
1089 conn.TypeMap().RegisterDefaultPgType(pgtype.Range[float64]{}, "examplefloatrange")
1090
1091 var inputRangeType = pgtype.Range[float64]{
1092 Lower: 1.0,
1093 Upper: 2.0,
1094 LowerType: pgtype.Inclusive,
1095 UpperType: pgtype.Inclusive,
1096 Valid: true,
1097 }
1098 var outputRangeType pgtype.Range[float64]
1099 err = tx.QueryRow(ctx, "SELECT $1::examplefloatrange", inputRangeType).Scan(&outputRangeType)
1100 require.NoError(t, err)
1101 require.Equal(t, inputRangeType, outputRangeType)
1102 })
1103 }
1104
1105 func TestLoadMultiRangeType(t *testing.T) {
1106 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1107 defer cancel()
1108
1109 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1110 pgxtest.SkipCockroachDB(t, conn, "Server does support range types")
1111 pgxtest.SkipPostgreSQLVersionLessThan(t, conn, 14)
1112
1113 tx, err := conn.Begin(ctx)
1114 require.NoError(t, err)
1115 defer tx.Rollback(ctx)
1116
1117 _, err = tx.Exec(ctx, "create type examplefloatrange as range (subtype=float8, subtype_diff=float8mi, multirange_type_name=examplefloatmultirange)")
1118 require.NoError(t, err)
1119
1120
1121 newRangeType, err := conn.LoadType(ctx, "examplefloatrange")
1122 require.NoError(t, err)
1123 conn.TypeMap().RegisterType(newRangeType)
1124 conn.TypeMap().RegisterDefaultPgType(pgtype.Range[float64]{}, "examplefloatrange")
1125
1126 newMultiRangeType, err := conn.LoadType(ctx, "examplefloatmultirange")
1127 require.NoError(t, err)
1128 conn.TypeMap().RegisterType(newMultiRangeType)
1129 conn.TypeMap().RegisterDefaultPgType(pgtype.Multirange[pgtype.Range[float64]]{}, "examplefloatmultirange")
1130
1131 var inputMultiRangeType = pgtype.Multirange[pgtype.Range[float64]]{
1132 {
1133 Lower: 1.0,
1134 Upper: 2.0,
1135 LowerType: pgtype.Inclusive,
1136 UpperType: pgtype.Inclusive,
1137 Valid: true,
1138 },
1139 {
1140 Lower: 3.0,
1141 Upper: 4.0,
1142 LowerType: pgtype.Exclusive,
1143 UpperType: pgtype.Exclusive,
1144 Valid: true,
1145 },
1146 }
1147 var outputMultiRangeType pgtype.Multirange[pgtype.Range[float64]]
1148 err = tx.QueryRow(ctx, "SELECT $1::examplefloatmultirange", inputMultiRangeType).Scan(&outputMultiRangeType)
1149 require.NoError(t, err)
1150 require.Equal(t, inputMultiRangeType, outputMultiRangeType)
1151 })
1152 }
1153
1154 func TestStmtCacheInvalidationConn(t *testing.T) {
1155 ctx := context.Background()
1156
1157 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
1158 defer closeConn(t, conn)
1159
1160
1161 _, err := conn.Exec(ctx, `
1162 DROP TABLE IF EXISTS drop_cols;
1163 CREATE TABLE drop_cols (
1164 id SERIAL PRIMARY KEY NOT NULL,
1165 f1 int NOT NULL,
1166 f2 int NOT NULL
1167 );
1168 `)
1169 require.NoError(t, err)
1170 _, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
1171 require.NoError(t, err)
1172
1173 getSQL := "SELECT * FROM drop_cols WHERE id = $1"
1174
1175
1176 rows, err := conn.Query(ctx, getSQL, 1)
1177 require.NoError(t, err)
1178 rows.Close()
1179 require.NoError(t, rows.Err())
1180
1181
1182 _, err = conn.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
1183 require.NoError(t, err)
1184
1185
1186
1187
1188
1189
1190 rows, err = conn.Query(ctx, getSQL, 1)
1191 require.NoError(t, err)
1192 rows.Next()
1193 nextErr := rows.Err()
1194 rows.Close()
1195 for _, err := range []error{nextErr, rows.Err()} {
1196 if err == nil {
1197 t.Fatal(`expected "cached plan must not change result type": no error`)
1198 }
1199 if !strings.Contains(err.Error(), "cached plan must not change result type") {
1200 t.Fatalf(`expected "cached plan must not change result type", got: "%s"`, err.Error())
1201 }
1202 }
1203
1204
1205 rows, err = conn.Query(ctx, getSQL, 1)
1206 require.NoError(t, err)
1207 rows.Next()
1208 err = rows.Err()
1209 require.NoError(t, err)
1210 rows.Close()
1211 require.NoError(t, rows.Err())
1212
1213 ensureConnValid(t, conn)
1214 }
1215
1216 func TestStmtCacheInvalidationTx(t *testing.T) {
1217 ctx := context.Background()
1218
1219 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
1220 defer closeConn(t, conn)
1221
1222 if conn.PgConn().ParameterStatus("crdb_version") != "" {
1223 t.Skip("Server has non-standard prepare in errored transaction behavior (https://github.com/cockroachdb/cockroach/issues/84140)")
1224 }
1225
1226
1227 _, err := conn.Exec(ctx, `
1228 DROP TABLE IF EXISTS drop_cols;
1229 CREATE TABLE drop_cols (
1230 id SERIAL PRIMARY KEY NOT NULL,
1231 f1 int NOT NULL,
1232 f2 int NOT NULL
1233 );
1234 `)
1235 require.NoError(t, err)
1236 _, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
1237 require.NoError(t, err)
1238
1239 tx, err := conn.Begin(ctx)
1240 require.NoError(t, err)
1241
1242 getSQL := "SELECT * FROM drop_cols WHERE id = $1"
1243
1244
1245 rows, err := tx.Query(ctx, getSQL, 1)
1246 require.NoError(t, err)
1247 rows.Close()
1248 require.NoError(t, rows.Err())
1249
1250
1251 _, err = tx.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
1252 require.NoError(t, err)
1253
1254
1255
1256
1257
1258
1259 rows, err = tx.Query(ctx, getSQL, 1)
1260 require.NoError(t, err)
1261 rows.Next()
1262 nextErr := rows.Err()
1263 rows.Close()
1264 for _, err := range []error{nextErr, rows.Err()} {
1265 if err == nil {
1266 t.Fatal(`expected "cached plan must not change result type": no error`)
1267 }
1268 if !strings.Contains(err.Error(), "cached plan must not change result type") {
1269 t.Fatalf(`expected "cached plan must not change result type", got: "%s"`, err.Error())
1270 }
1271 }
1272
1273 rows, _ = tx.Query(ctx, getSQL, 1)
1274 rows.Close()
1275 err = rows.Err()
1276
1277
1278 require.Error(t, err)
1279 rows.Close()
1280
1281 err = tx.Rollback(ctx)
1282 require.NoError(t, err)
1283
1284
1285 rows, err = conn.Query(ctx, getSQL, 1)
1286 require.NoError(t, err)
1287 rows.Next()
1288 err = rows.Err()
1289 require.NoError(t, err)
1290 rows.Close()
1291
1292 ensureConnValid(t, conn)
1293 }
1294
1295 func TestInsertDurationInterval(t *testing.T) {
1296 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1297 defer cancel()
1298
1299 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1300 _, err := conn.Exec(context.Background(), "create temporary table t(duration INTERVAL(0) NOT NULL)")
1301 require.NoError(t, err)
1302
1303 result, err := conn.Exec(context.Background(), "insert into t(duration) values($1)", time.Minute)
1304 require.NoError(t, err)
1305
1306 n := result.RowsAffected()
1307 require.EqualValues(t, 1, n)
1308 })
1309 }
1310
1311 func TestRawValuesUnderlyingMemoryReused(t *testing.T) {
1312 defaultConnTestRunner.RunTest(context.Background(), t, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1313 var buf []byte
1314
1315 rows, err := conn.Query(ctx, `select 1::int`)
1316 require.NoError(t, err)
1317
1318 for rows.Next() {
1319 buf = rows.RawValues()[0]
1320 }
1321
1322 require.NoError(t, rows.Err())
1323
1324 original := make([]byte, len(buf))
1325 copy(original, buf)
1326
1327 for i := 0; i < 1_000_000; i++ {
1328 rows, err := conn.Query(ctx, `select $1::int`, i)
1329 require.NoError(t, err)
1330 rows.Close()
1331 require.NoError(t, rows.Err())
1332
1333 if !bytes.Equal(original, buf) {
1334 return
1335 }
1336 }
1337
1338 t.Fatal("expected buffer from RawValues to be overwritten by subsequent queries but it was not")
1339 })
1340 }
1341
1342
1343 func TestConnDeallocateInvalidatedCachedStatementsWhenCanceled(t *testing.T) {
1344 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1345 defer cancel()
1346
1347 pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1348 pgxtest.SkipCockroachDB(t, conn, "CockroachDB returns decimal instead of integer for integer division")
1349
1350 var n int32
1351 err := conn.QueryRow(ctx, "select 1 / $1::int", 1).Scan(&n)
1352 require.NoError(t, err)
1353 require.EqualValues(t, 1, n)
1354
1355
1356
1357
1358 err = conn.QueryRow(ctx, "select 1 / $1::int", 0).Scan(&n)
1359 require.Error(t, err)
1360
1361 ctx2, cancel2 := context.WithCancel(ctx)
1362 cancel2()
1363 err = conn.QueryRow(ctx2, "select 1 / $1::int", 1).Scan(&n)
1364 require.Error(t, err)
1365 require.ErrorIs(t, err, context.Canceled)
1366
1367 err = conn.QueryRow(ctx, "select 1 / $1::int", 1).Scan(&n)
1368 require.NoError(t, err)
1369 require.EqualValues(t, 1, n)
1370 })
1371 }
1372
1373
1374 func TestConnDeallocateInvalidatedCachedStatementsInTransactionWithBatch(t *testing.T) {
1375 t.Parallel()
1376
1377 ctx := context.Background()
1378
1379 connString := os.Getenv("PGX_TEST_DATABASE")
1380 config := mustParseConfig(t, connString)
1381 config.DefaultQueryExecMode = pgx.QueryExecModeCacheStatement
1382 config.StatementCacheCapacity = 2
1383
1384 conn, err := pgx.ConnectConfig(ctx, config)
1385 require.NoError(t, err)
1386
1387 tx, err := conn.Begin(ctx)
1388 require.NoError(t, err)
1389 defer tx.Rollback(ctx)
1390
1391 _, err = tx.Exec(ctx, "select $1::int + 1", 1)
1392 require.NoError(t, err)
1393
1394 _, err = tx.Exec(ctx, "select $1::int + 2", 1)
1395 require.NoError(t, err)
1396
1397
1398 _, err = tx.Exec(ctx, "select $1::int + 3", 1)
1399 require.NoError(t, err)
1400
1401 batch := &pgx.Batch{}
1402 batch.Queue("select $1::int + 1", 1)
1403 err = tx.SendBatch(ctx, batch).Close()
1404 require.NoError(t, err)
1405
1406 err = tx.Rollback(ctx)
1407 require.NoError(t, err)
1408
1409 ensureConnValid(t, conn)
1410 }
1411
View as plain text