1 package pgx_test
2
3 import (
4 "bytes"
5 "context"
6 "log"
7 "os"
8 "strings"
9 "sync"
10 "testing"
11 "time"
12
13 "github.com/jackc/pgconn"
14 "github.com/jackc/pgconn/stmtcache"
15 "github.com/jackc/pgtype"
16 "github.com/jackc/pgx/v4"
17 "github.com/stretchr/testify/assert"
18 "github.com/stretchr/testify/require"
19 )
20
21 func TestCrateDBConnect(t *testing.T) {
22 t.Parallel()
23
24 connString := os.Getenv("PGX_TEST_CRATEDB_CONN_STRING")
25 if connString == "" {
26 t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_CRATEDB_CONN_STRING")
27 }
28
29 conn, err := pgx.Connect(context.Background(), connString)
30 require.Nil(t, err)
31 defer closeConn(t, conn)
32
33 assert.Equal(t, connString, conn.Config().ConnString())
34
35 var result int
36 err = conn.QueryRow(context.Background(), "select 1 +1").Scan(&result)
37 if err != nil {
38 t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
39 }
40 if result != 2 {
41 t.Errorf("bad result: %d", result)
42 }
43 }
44
45 func TestConnect(t *testing.T) {
46 t.Parallel()
47
48 connString := os.Getenv("PGX_TEST_DATABASE")
49 config := mustParseConfig(t, connString)
50
51 conn, err := pgx.ConnectConfig(context.Background(), config)
52 if err != nil {
53 t.Fatalf("Unable to establish connection: %v", err)
54 }
55
56 assertConfigsEqual(t, config, conn.Config(), "Conn.Config() returns original config")
57
58 var currentDB string
59 err = conn.QueryRow(context.Background(), "select current_database()").Scan(¤tDB)
60 if err != nil {
61 t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
62 }
63 if currentDB != config.Config.Database {
64 t.Errorf("Did not connect to specified database (%v)", config.Config.Database)
65 }
66
67 var user string
68 err = conn.QueryRow(context.Background(), "select current_user").Scan(&user)
69 if err != nil {
70 t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
71 }
72 if user != config.Config.User {
73 t.Errorf("Did not connect as specified user (%v)", config.Config.User)
74 }
75
76 err = conn.Close(context.Background())
77 if err != nil {
78 t.Fatal("Unable to close connection")
79 }
80 }
81
82 func TestConnectWithPreferSimpleProtocol(t *testing.T) {
83 t.Parallel()
84
85 connConfig := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
86 connConfig.PreferSimpleProtocol = true
87
88 conn := mustConnect(t, connConfig)
89 defer closeConn(t, conn)
90
91
92
93
94 var s pgtype.Text
95 err := conn.QueryRow(context.Background(), "select $1::int4", 42).Scan(&s)
96 if err != nil {
97 t.Fatal(err)
98 }
99
100 if s.Get() != "42" {
101 t.Fatalf(`expected "42", got %v`, s)
102 }
103
104 ensureConnValid(t, conn)
105 }
106
107 func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
108 config := &pgx.ConnConfig{}
109 require.PanicsWithValue(t, "config must be created by ParseConfig", func() {
110 pgx.ConnectConfig(context.Background(), config)
111 })
112 }
113
114 func TestConfigContainsConnStr(t *testing.T) {
115 connStr := os.Getenv("PGX_TEST_DATABASE")
116 config, err := pgx.ParseConfig(connStr)
117 require.NoError(t, err)
118 assert.Equal(t, connStr, config.ConnString())
119 }
120
121 func TestConfigCopyReturnsEqualConfig(t *testing.T) {
122 connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
123 original, err := pgx.ParseConfig(connString)
124 require.NoError(t, err)
125
126 copied := original.Copy()
127 assertConfigsEqual(t, original, copied, t.Name())
128 }
129
130 func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
131 connString := os.Getenv("PGX_TEST_DATABASE")
132 original, err := pgx.ParseConfig(connString)
133 require.NoError(t, err)
134
135 copied := original.Copy()
136 assert.NotPanics(t, func() {
137 _, err = pgx.ConnectConfig(context.Background(), copied)
138 })
139 assert.NoError(t, err)
140 }
141
142 func TestParseConfigExtractsStatementCacheOptions(t *testing.T) {
143 t.Parallel()
144
145 config, err := pgx.ParseConfig("statement_cache_capacity=0")
146 require.NoError(t, err)
147 require.Nil(t, config.BuildStatementCache)
148
149 config, err = pgx.ParseConfig("statement_cache_capacity=42")
150 require.NoError(t, err)
151 require.NotNil(t, config.BuildStatementCache)
152 c := config.BuildStatementCache(nil)
153 require.NotNil(t, c)
154 require.Equal(t, 42, c.Cap())
155 require.Equal(t, stmtcache.ModePrepare, c.Mode())
156
157 config, err = pgx.ParseConfig("statement_cache_capacity=42 statement_cache_mode=prepare")
158 require.NoError(t, err)
159 require.NotNil(t, config.BuildStatementCache)
160 c = config.BuildStatementCache(nil)
161 require.NotNil(t, c)
162 require.Equal(t, 42, c.Cap())
163 require.Equal(t, stmtcache.ModePrepare, c.Mode())
164
165 config, err = pgx.ParseConfig("statement_cache_capacity=42 statement_cache_mode=describe")
166 require.NoError(t, err)
167 require.NotNil(t, config.BuildStatementCache)
168 c = config.BuildStatementCache(nil)
169 require.NotNil(t, c)
170 require.Equal(t, 42, c.Cap())
171 require.Equal(t, stmtcache.ModeDescribe, c.Mode())
172 }
173
174 func TestParseConfigExtractsPreferSimpleProtocol(t *testing.T) {
175 t.Parallel()
176
177 for _, tt := range []struct {
178 connString string
179 preferSimpleProtocol bool
180 }{
181 {"", false},
182 {"prefer_simple_protocol=false", false},
183 {"prefer_simple_protocol=0", false},
184 {"prefer_simple_protocol=true", true},
185 {"prefer_simple_protocol=1", true},
186 } {
187 config, err := pgx.ParseConfig(tt.connString)
188 require.NoError(t, err)
189 require.Equalf(t, tt.preferSimpleProtocol, config.PreferSimpleProtocol, "connString: `%s`", tt.connString)
190 require.Empty(t, config.RuntimeParams["prefer_simple_protocol"])
191 }
192 }
193
194 func TestExec(t *testing.T) {
195 t.Parallel()
196
197 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
198 if results := mustExec(t, conn, "create temporary table foo(id integer primary key);"); string(results) != "CREATE TABLE" {
199 t.Error("Unexpected results from Exec")
200 }
201
202
203 if results := mustExec(t, conn, "insert into foo(id) values($1)", 1); string(results) != "INSERT 0 1" {
204 t.Errorf("Unexpected results from Exec: %v", results)
205 }
206
207 if results := mustExec(t, conn, "drop table foo;"); string(results) != "DROP TABLE" {
208 t.Error("Unexpected results from Exec")
209 }
210
211
212 if results := mustExec(t, conn, "create temporary table foo(id serial primary key); drop table foo;"); string(results) != "DROP TABLE" {
213 t.Error("Unexpected results from Exec")
214 }
215
216
217 if results := mustExec(t, conn, strings.Repeat("select 42; ", 1000)); string(results) != "SELECT 1" {
218 t.Errorf("Unexpected results from Exec: %v", results)
219 }
220
221
222 if results := mustExec(t, conn, "--;"); string(results) != "" {
223 t.Errorf("Unexpected results from Exec: %v", results)
224 }
225 })
226 }
227
228 func TestExecFailure(t *testing.T) {
229 t.Parallel()
230
231 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
232 if _, err := conn.Exec(context.Background(), "selct;"); err == nil {
233 t.Fatal("Expected SQL syntax error")
234 }
235
236 rows, _ := conn.Query(context.Background(), "select 1")
237 rows.Close()
238 if rows.Err() != nil {
239 t.Fatalf("Exec failure appears to have broken connection: %v", rows.Err())
240 }
241 })
242 }
243
244 func TestExecFailureWithArguments(t *testing.T) {
245 t.Parallel()
246
247 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
248 _, err := conn.Exec(context.Background(), "selct $1;", 1)
249 if err == nil {
250 t.Fatal("Expected SQL syntax error")
251 }
252 assert.False(t, pgconn.SafeToRetry(err))
253
254 _, err = conn.Exec(context.Background(), "select $1::varchar(1);", "1", "2")
255 require.Error(t, err)
256 })
257 }
258
259 func TestExecContextWithoutCancelation(t *testing.T) {
260 t.Parallel()
261
262 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
263 ctx, cancelFunc := context.WithCancel(context.Background())
264 defer cancelFunc()
265
266 commandTag, err := conn.Exec(ctx, "create temporary table foo(id integer primary key);")
267 if err != nil {
268 t.Fatal(err)
269 }
270 if string(commandTag) != "CREATE TABLE" {
271 t.Fatalf("Unexpected results from Exec: %v", commandTag)
272 }
273 assert.False(t, pgconn.SafeToRetry(err))
274 })
275 }
276
277 func TestExecContextFailureWithoutCancelation(t *testing.T) {
278 t.Parallel()
279
280 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
281 ctx, cancelFunc := context.WithCancel(context.Background())
282 defer cancelFunc()
283
284 _, err := conn.Exec(ctx, "selct;")
285 if err == nil {
286 t.Fatal("Expected SQL syntax error")
287 }
288 assert.False(t, pgconn.SafeToRetry(err))
289
290 rows, _ := conn.Query(context.Background(), "select 1")
291 rows.Close()
292 if rows.Err() != nil {
293 t.Fatalf("ExecEx failure appears to have broken connection: %v", rows.Err())
294 }
295 assert.False(t, pgconn.SafeToRetry(err))
296 })
297 }
298
299 func TestExecContextFailureWithoutCancelationWithArguments(t *testing.T) {
300 t.Parallel()
301
302 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
303 ctx, cancelFunc := context.WithCancel(context.Background())
304 defer cancelFunc()
305
306 _, err := conn.Exec(ctx, "selct $1;", 1)
307 if err == nil {
308 t.Fatal("Expected SQL syntax error")
309 }
310 assert.False(t, pgconn.SafeToRetry(err))
311 })
312 }
313
314 func TestExecFailureCloseBefore(t *testing.T) {
315 t.Parallel()
316
317 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
318 closeConn(t, conn)
319
320 _, err := conn.Exec(context.Background(), "select 1")
321 require.Error(t, err)
322 assert.True(t, pgconn.SafeToRetry(err))
323 }
324
325 func TestExecStatementCacheModes(t *testing.T) {
326 t.Parallel()
327
328 config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
329
330 tests := []struct {
331 name string
332 buildStatementCache pgx.BuildStatementCacheFunc
333 }{
334 {
335 name: "disabled",
336 buildStatementCache: nil,
337 },
338 {
339 name: "prepare",
340 buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
341 return stmtcache.New(conn, stmtcache.ModePrepare, 32)
342 },
343 },
344 {
345 name: "describe",
346 buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
347 return stmtcache.New(conn, stmtcache.ModeDescribe, 32)
348 },
349 },
350 }
351
352 for _, tt := range tests {
353 func() {
354 config.BuildStatementCache = tt.buildStatementCache
355 conn := mustConnect(t, config)
356 defer closeConn(t, conn)
357
358 commandTag, err := conn.Exec(context.Background(), "select 1")
359 assert.NoError(t, err, tt.name)
360 assert.Equal(t, "SELECT 1", string(commandTag), tt.name)
361
362 commandTag, err = conn.Exec(context.Background(), "select 1 union all select 1")
363 assert.NoError(t, err, tt.name)
364 assert.Equal(t, "SELECT 2", string(commandTag), tt.name)
365
366 commandTag, err = conn.Exec(context.Background(), "select 1")
367 assert.NoError(t, err, tt.name)
368 assert.Equal(t, "SELECT 1", string(commandTag), tt.name)
369
370 ensureConnValid(t, conn)
371 }()
372 }
373 }
374
375 func TestExecPerQuerySimpleProtocol(t *testing.T) {
376 t.Parallel()
377
378 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
379 defer closeConn(t, conn)
380
381 ctx, cancelFunc := context.WithCancel(context.Background())
382 defer cancelFunc()
383
384 commandTag, err := conn.Exec(ctx, "create temporary table foo(name varchar primary key);")
385 if err != nil {
386 t.Fatal(err)
387 }
388 if string(commandTag) != "CREATE TABLE" {
389 t.Fatalf("Unexpected results from Exec: %v", commandTag)
390 }
391
392 commandTag, err = conn.Exec(ctx,
393 "insert into foo(name) values($1);",
394 pgx.QuerySimpleProtocol(true),
395 "bar'; drop table foo;--",
396 )
397 if err != nil {
398 t.Fatal(err)
399 }
400 if string(commandTag) != "INSERT 0 1" {
401 t.Fatalf("Unexpected results from Exec: %v", commandTag)
402 }
403
404 }
405
406 func TestPrepare(t *testing.T) {
407 t.Parallel()
408
409 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
410 defer closeConn(t, conn)
411
412 _, err := conn.Prepare(context.Background(), "test", "select $1::varchar")
413 if err != nil {
414 t.Errorf("Unable to prepare statement: %v", err)
415 return
416 }
417
418 var s string
419 err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
420 if err != nil {
421 t.Errorf("Executing prepared statement failed: %v", err)
422 }
423
424 if s != "hello" {
425 t.Errorf("Prepared statement did not return expected value: %v", s)
426 }
427
428 err = conn.Deallocate(context.Background(), "test")
429 if err != nil {
430 t.Errorf("conn.Deallocate failed: %v", err)
431 }
432
433
434
435
436 _, err = conn.Prepare(context.Background(), "test", "select $1::integer")
437 if err != nil {
438 t.Errorf("Unable to prepare statement: %v", err)
439 return
440 }
441
442 var n int32
443 err = conn.QueryRow(context.Background(), "test", int32(1)).Scan(&n)
444 if err != nil {
445 t.Errorf("Executing prepared statement failed: %v", err)
446 }
447
448 if n != 1 {
449 t.Errorf("Prepared statement did not return expected value: %v", s)
450 }
451
452 err = conn.Deallocate(context.Background(), "test")
453 if err != nil {
454 t.Errorf("conn.Deallocate failed: %v", err)
455 }
456 }
457
458 func TestPrepareBadSQLFailure(t *testing.T) {
459 t.Parallel()
460
461 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
462 defer closeConn(t, conn)
463
464 if _, err := conn.Prepare(context.Background(), "badSQL", "select foo"); err == nil {
465 t.Fatal("Prepare should have failed with syntax error")
466 }
467
468 ensureConnValid(t, conn)
469 }
470
471 func TestPrepareIdempotency(t *testing.T) {
472 t.Parallel()
473
474 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
475 defer closeConn(t, conn)
476
477 for i := 0; i < 2; i++ {
478 _, err := conn.Prepare(context.Background(), "test", "select 42::integer")
479 if err != nil {
480 t.Fatalf("%d. Unable to prepare statement: %v", i, err)
481 }
482
483 var n int32
484 err = conn.QueryRow(context.Background(), "test").Scan(&n)
485 if err != nil {
486 t.Errorf("%d. Executing prepared statement failed: %v", i, err)
487 }
488
489 if n != int32(42) {
490 t.Errorf("%d. Prepared statement did not return expected value: %v", i, n)
491 }
492 }
493
494 _, err := conn.Prepare(context.Background(), "test", "select 'fail'::varchar")
495 if err == nil {
496 t.Fatalf("Prepare statement with same name but different SQL should have failed but it didn't")
497 return
498 }
499 }
500
501 func TestPrepareStatementCacheModes(t *testing.T) {
502 t.Parallel()
503
504 config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
505
506 tests := []struct {
507 name string
508 buildStatementCache pgx.BuildStatementCacheFunc
509 }{
510 {
511 name: "disabled",
512 buildStatementCache: nil,
513 },
514 {
515 name: "prepare",
516 buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
517 return stmtcache.New(conn, stmtcache.ModePrepare, 32)
518 },
519 },
520 {
521 name: "describe",
522 buildStatementCache: func(conn *pgconn.PgConn) stmtcache.Cache {
523 return stmtcache.New(conn, stmtcache.ModeDescribe, 32)
524 },
525 },
526 }
527
528 for _, tt := range tests {
529 t.Run(tt.name, func(t *testing.T) {
530 config.BuildStatementCache = tt.buildStatementCache
531 conn := mustConnect(t, config)
532 defer closeConn(t, conn)
533
534 _, err := conn.Prepare(context.Background(), "test", "select $1::text")
535 require.NoError(t, err)
536
537 var s string
538 err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
539 require.NoError(t, err)
540 require.Equal(t, "hello", s)
541 })
542 }
543 }
544
545 func TestListenNotify(t *testing.T) {
546 t.Parallel()
547
548 listener := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
549 defer closeConn(t, listener)
550
551 if listener.PgConn().ParameterStatus("crdb_version") != "" {
552 t.Skip("Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
553 }
554
555 mustExec(t, listener, "listen chat")
556
557 notifier := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
558 defer closeConn(t, notifier)
559
560 mustExec(t, notifier, "notify chat")
561
562
563 notification, err := listener.WaitForNotification(context.Background())
564 require.NoError(t, err)
565 assert.Equal(t, "chat", notification.Channel)
566
567
568 mustExec(t, notifier, "notify chat")
569 rows, _ := listener.Query(context.Background(), "select 1")
570 rows.Close()
571 require.NoError(t, rows.Err())
572
573 ctx, cancelFn := context.WithCancel(context.Background())
574 cancelFn()
575 notification, err = listener.WaitForNotification(ctx)
576 require.NoError(t, err)
577 assert.Equal(t, "chat", notification.Channel)
578
579
580 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
581 defer cancel()
582 notification, err = listener.WaitForNotification(ctx)
583 assert.True(t, pgconn.Timeout(err))
584
585
586 mustExec(t, notifier, "notify chat")
587 notification, err = listener.WaitForNotification(context.Background())
588 require.NoError(t, err)
589 assert.Equal(t, "chat", notification.Channel)
590 }
591
592 func TestListenNotifyWhileBusyIsSafe(t *testing.T) {
593 t.Parallel()
594
595 func() {
596 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
597 defer closeConn(t, conn)
598 skipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
599 }()
600
601 listenerDone := make(chan bool)
602 notifierDone := make(chan bool)
603 go func() {
604 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
605 defer closeConn(t, conn)
606 defer func() {
607 listenerDone <- true
608 }()
609
610 mustExec(t, conn, "listen busysafe")
611
612 for i := 0; i < 5000; i++ {
613 var sum int32
614 var rowCount int32
615
616 rows, err := conn.Query(context.Background(), "select generate_series(1,$1)", 100)
617 if err != nil {
618 t.Errorf("conn.Query failed: %v", err)
619 return
620 }
621
622 for rows.Next() {
623 var n int32
624 if err := rows.Scan(&n); err != nil {
625 t.Errorf("Row scan failed: %v", err)
626 return
627 }
628 sum += n
629 rowCount++
630 }
631
632 if rows.Err() != nil {
633 t.Errorf("conn.Query failed: %v", err)
634 return
635 }
636
637 if sum != 5050 {
638 t.Errorf("Wrong rows sum: %v", sum)
639 return
640 }
641
642 if rowCount != 100 {
643 t.Errorf("Wrong number of rows: %v", rowCount)
644 return
645 }
646
647 time.Sleep(1 * time.Microsecond)
648 }
649 }()
650
651 go func() {
652 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
653 defer closeConn(t, conn)
654 defer func() {
655 notifierDone <- true
656 }()
657
658 for i := 0; i < 100000; i++ {
659 mustExec(t, conn, "notify busysafe, 'hello'")
660 time.Sleep(1 * time.Microsecond)
661 }
662 }()
663
664 <-listenerDone
665 <-notifierDone
666 }
667
668 func TestListenNotifySelfNotification(t *testing.T) {
669 t.Parallel()
670
671 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
672 defer closeConn(t, conn)
673
674 skipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
675
676 mustExec(t, conn, "listen self")
677
678
679 mustExec(t, conn, "notify self")
680
681 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
682 defer cancel()
683 notification, err := conn.WaitForNotification(ctx)
684 require.NoError(t, err)
685 assert.Equal(t, "self", notification.Channel)
686
687
688 mustExec(t, conn, "notify self")
689
690 rows, _ := conn.Query(context.Background(), "select 1")
691 rows.Close()
692 if rows.Err() != nil {
693 t.Fatalf("Unexpected error on Query: %v", rows.Err())
694 }
695
696 ctx, cncl := context.WithTimeout(context.Background(), time.Second)
697 defer cncl()
698 notification, err = conn.WaitForNotification(ctx)
699 require.NoError(t, err)
700 assert.Equal(t, "self", notification.Channel)
701 }
702
703 func TestFatalRxError(t *testing.T) {
704 t.Parallel()
705
706 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
707 defer closeConn(t, conn)
708
709 skipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
710
711 var wg sync.WaitGroup
712 wg.Add(1)
713 go func() {
714 defer wg.Done()
715 var n int32
716 var s string
717 err := conn.QueryRow(context.Background(), "select 1::int4, pg_sleep(10)::varchar").Scan(&n, &s)
718 if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Severity == "FATAL" {
719 } else {
720 t.Errorf("Expected QueryRow Scan to return fatal PgError, but instead received %v", err)
721 return
722 }
723 }()
724
725 otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
726 defer otherConn.Close(context.Background())
727
728 if _, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID()); err != nil {
729 t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
730 }
731
732 wg.Wait()
733
734 if !conn.IsClosed() {
735 t.Fatal("Connection should be closed")
736 }
737 }
738
739 func TestFatalTxError(t *testing.T) {
740 t.Parallel()
741
742
743 for i := 0; i < 50; i++ {
744 func() {
745 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
746 defer closeConn(t, conn)
747
748 skipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
749
750 otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
751 defer otherConn.Close(context.Background())
752
753 _, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID())
754 if err != nil {
755 t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
756 }
757
758 err = conn.QueryRow(context.Background(), "select 1").Scan(nil)
759 if err == nil {
760 t.Fatal("Expected error but none occurred")
761 }
762
763 if !conn.IsClosed() {
764 t.Fatalf("Connection should be closed but isn't. Previous Query err: %v", err)
765 }
766 }()
767 }
768 }
769
770 func TestInsertBoolArray(t *testing.T) {
771 t.Parallel()
772
773 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
774 if results := mustExec(t, conn, "create temporary table foo(spice bool[]);"); string(results) != "CREATE TABLE" {
775 t.Error("Unexpected results from Exec")
776 }
777
778
779 if results := mustExec(t, conn, "insert into foo(spice) values($1)", []bool{true, false, true}); string(results) != "INSERT 0 1" {
780 t.Errorf("Unexpected results from Exec: %v", results)
781 }
782 })
783 }
784
785 func TestInsertTimestampArray(t *testing.T) {
786 t.Parallel()
787
788 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
789 if results := mustExec(t, conn, "create temporary table foo(spice timestamp[]);"); string(results) != "CREATE TABLE" {
790 t.Error("Unexpected results from Exec")
791 }
792
793
794 if results := mustExec(t, conn, "insert into foo(spice) values($1)", []time.Time{time.Unix(1419143667, 0), time.Unix(1419143672, 0)}); string(results) != "INSERT 0 1" {
795 t.Errorf("Unexpected results from Exec: %v", results)
796 }
797 })
798 }
799
800 type testLog struct {
801 lvl pgx.LogLevel
802 msg string
803 data map[string]interface{}
804 }
805
806 type testLogger struct {
807 logs []testLog
808 }
809
810 func (l *testLogger) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
811 data["ctxdata"] = ctx.Value("ctxdata")
812 l.logs = append(l.logs, testLog{lvl: level, msg: msg, data: data})
813 }
814
815 func TestLogPassesContext(t *testing.T) {
816 t.Parallel()
817
818 l1 := &testLogger{}
819 config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
820 config.Logger = l1
821
822 conn := mustConnect(t, config)
823 defer closeConn(t, conn)
824
825 l1.logs = l1.logs[0:0]
826
827 ctx := context.WithValue(context.Background(), "ctxdata", "foo")
828
829 if _, err := conn.Exec(ctx, ";"); err != nil {
830 t.Fatal(err)
831 }
832
833 if len(l1.logs) != 1 {
834 t.Fatal("Expected logger to be called once, but it wasn't")
835 }
836
837 if l1.logs[0].data["ctxdata"] != "foo" {
838 t.Fatal("Expected context data to be passed to logger, but it wasn't")
839 }
840 }
841
842 func TestLoggerFunc(t *testing.T) {
843 t.Parallel()
844
845 const testMsg = "foo"
846
847 buf := bytes.Buffer{}
848 logger := log.New(&buf, "", 0)
849
850 createAdapterFn := func(logger *log.Logger) pgx.LoggerFunc {
851 return func(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
852 logger.Printf("%s", testMsg)
853 }
854 }
855
856 config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
857 config.Logger = createAdapterFn(logger)
858
859 conn := mustConnect(t, config)
860 defer closeConn(t, conn)
861
862 buf.Reset()
863
864 if _, err := conn.Exec(context.TODO(), ";"); err != nil {
865 t.Fatal(err)
866 }
867
868 if strings.TrimSpace(buf.String()) != testMsg {
869 t.Errorf("Expected logger function to return '%s', but it was '%s'", testMsg, buf.String())
870 }
871 }
872
873 func TestIdentifierSanitize(t *testing.T) {
874 t.Parallel()
875
876 tests := []struct {
877 ident pgx.Identifier
878 expected string
879 }{
880 {
881 ident: pgx.Identifier{`foo`},
882 expected: `"foo"`,
883 },
884 {
885 ident: pgx.Identifier{`select`},
886 expected: `"select"`,
887 },
888 {
889 ident: pgx.Identifier{`foo`, `bar`},
890 expected: `"foo"."bar"`,
891 },
892 {
893 ident: pgx.Identifier{`you should " not do this`},
894 expected: `"you should "" not do this"`,
895 },
896 {
897 ident: pgx.Identifier{`you should " not do this`, `please don't`},
898 expected: `"you should "" not do this"."please don't"`,
899 },
900 {
901 ident: pgx.Identifier{`you should ` + string([]byte{0}) + `not do this`},
902 expected: `"you should not do this"`,
903 },
904 }
905
906 for i, tt := range tests {
907 qval := tt.ident.Sanitize()
908 if qval != tt.expected {
909 t.Errorf("%d. Expected Sanitize %v to return %v but it was %v", i, tt.ident, tt.expected, qval)
910 }
911 }
912 }
913
914 func TestConnInitConnInfo(t *testing.T) {
915 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
916 defer closeConn(t, conn)
917
918
919 nameOIDs := map[string]uint32{
920 "_int8": pgtype.Int8ArrayOID,
921 "int8": pgtype.Int8OID,
922 "json": pgtype.JSONOID,
923 "text": pgtype.TextOID,
924 }
925 for name, oid := range nameOIDs {
926 dtByName, ok := conn.ConnInfo().DataTypeForName(name)
927 if !ok {
928 t.Fatalf("Expected type named %v to be present", name)
929 }
930 dtByOID, ok := conn.ConnInfo().DataTypeForOID(oid)
931 if !ok {
932 t.Fatalf("Expected type OID %v to be present", oid)
933 }
934 if dtByName != dtByOID {
935 t.Fatalf("Expected type named %v to be the same as type OID %v", name, oid)
936 }
937 }
938
939 ensureConnValid(t, conn)
940 }
941
942 func TestUnregisteredTypeUsableAsStringArgumentAndBaseResult(t *testing.T) {
943 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
944 skipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
945
946 var n uint64
947 err := conn.QueryRow(context.Background(), "select $1::uint64", "42").Scan(&n)
948 if err != nil {
949 t.Fatal(err)
950 }
951
952 if n != 42 {
953 t.Fatalf("Expected n to be 42, but was %v", n)
954 }
955 })
956 }
957
958 func TestDomainType(t *testing.T) {
959 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
960 skipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")
961
962 var n uint64
963
964
965
966 err := conn.QueryRow(context.Background(), "select $1::uint64", uint64(24)).Scan(&n)
967 require.NoError(t, err)
968
969
970
971 err = conn.QueryRow(context.Background(), "select $1::uint64", "42").Scan(&n)
972 if err != nil {
973 t.Fatal(err)
974 }
975 if n != 42 {
976 t.Fatalf("Expected n to be 42, but was %v", n)
977 }
978
979 var uint64OID uint32
980 err = conn.QueryRow(context.Background(), "select t.oid from pg_type t where t.typname='uint64';").Scan(&uint64OID)
981 if err != nil {
982 t.Fatalf("did not find uint64 OID, %v", err)
983 }
984 conn.ConnInfo().RegisterDataType(pgtype.DataType{Value: &pgtype.Numeric{}, Name: "uint64", OID: uint64OID})
985
986
987 err = conn.QueryRow(context.Background(), "select $1::uint64", "7").Scan(&n)
988 if err != nil {
989 t.Fatal(err)
990 }
991 if n != 7 {
992 t.Fatalf("Expected n to be 7, but was %v", n)
993 }
994
995
996 err = conn.QueryRow(context.Background(), "select $1::uint64", uint64(24)).Scan(&n)
997 if err != nil {
998 t.Fatal(err)
999 }
1000 if n != 24 {
1001 t.Fatalf("Expected n to be 24, but was %v", n)
1002 }
1003 })
1004 }
1005
1006 func TestStmtCacheInvalidationConn(t *testing.T) {
1007 ctx := context.Background()
1008
1009 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
1010 defer closeConn(t, conn)
1011
1012
1013 _, err := conn.Exec(ctx, `
1014 DROP TABLE IF EXISTS drop_cols;
1015 CREATE TABLE drop_cols (
1016 id SERIAL PRIMARY KEY NOT NULL,
1017 f1 int NOT NULL,
1018 f2 int NOT NULL
1019 );
1020 `)
1021 require.NoError(t, err)
1022 _, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
1023 require.NoError(t, err)
1024
1025 getSQL := "SELECT * FROM drop_cols WHERE id = $1"
1026
1027
1028 rows, err := conn.Query(ctx, getSQL, 1)
1029 require.NoError(t, err)
1030 rows.Close()
1031
1032
1033 _, err = conn.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
1034 require.NoError(t, err)
1035
1036
1037
1038
1039
1040
1041 rows, err = conn.Query(ctx, getSQL, 1)
1042 require.NoError(t, err)
1043 rows.Next()
1044 nextErr := rows.Err()
1045 rows.Close()
1046 for _, err := range []error{nextErr, rows.Err()} {
1047 if err == nil {
1048 t.Fatal("expected InvalidCachedStatementPlanError: no error")
1049 }
1050 if !strings.Contains(err.Error(), "cached plan must not change result type") {
1051 t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error())
1052 }
1053 }
1054
1055
1056 rows, err = conn.Query(ctx, getSQL, 1)
1057 require.NoError(t, err)
1058 rows.Next()
1059 err = rows.Err()
1060 require.NoError(t, err)
1061 rows.Close()
1062 require.NoError(t, rows.Err())
1063
1064 ensureConnValid(t, conn)
1065 }
1066
1067 func TestStmtCacheInvalidationTx(t *testing.T) {
1068 ctx := context.Background()
1069
1070 conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
1071 defer closeConn(t, conn)
1072
1073
1074 _, err := conn.Exec(ctx, `
1075 DROP TABLE IF EXISTS drop_cols;
1076 CREATE TABLE drop_cols (
1077 id SERIAL PRIMARY KEY NOT NULL,
1078 f1 int NOT NULL,
1079 f2 int NOT NULL
1080 );
1081 `)
1082 require.NoError(t, err)
1083 _, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
1084 require.NoError(t, err)
1085
1086 tx, err := conn.Begin(ctx)
1087 require.NoError(t, err)
1088
1089 getSQL := "SELECT * FROM drop_cols WHERE id = $1"
1090
1091
1092 rows, err := tx.Query(ctx, getSQL, 1)
1093 require.NoError(t, err)
1094 rows.Close()
1095
1096
1097 _, err = tx.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
1098 require.NoError(t, err)
1099
1100
1101
1102
1103
1104
1105 rows, err = tx.Query(ctx, getSQL, 1)
1106 require.NoError(t, err)
1107 rows.Next()
1108 nextErr := rows.Err()
1109 rows.Close()
1110 for _, err := range []error{nextErr, rows.Err()} {
1111 if err == nil {
1112 t.Fatal("expected InvalidCachedStatementPlanError: no error")
1113 }
1114 if !strings.Contains(err.Error(), "cached plan must not change result type") {
1115 t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error())
1116 }
1117 }
1118
1119 rows, err = tx.Query(ctx, getSQL, 1)
1120 require.NoError(t, err)
1121 rows.Next()
1122 err = rows.Err()
1123
1124
1125 require.Error(t, err)
1126 rows.Close()
1127
1128 err = tx.Rollback(ctx)
1129 require.NoError(t, err)
1130
1131
1132 rows, err = conn.Query(ctx, getSQL, 1)
1133 require.NoError(t, err)
1134 rows.Next()
1135 err = rows.Err()
1136 require.NoError(t, err)
1137 rows.Close()
1138
1139 ensureConnValid(t, conn)
1140 }
1141
1142 func TestInsertDurationInterval(t *testing.T) {
1143 testWithAndWithoutPreferSimpleProtocol(t, func(t *testing.T, conn *pgx.Conn) {
1144 _, err := conn.Exec(context.Background(), "create temporary table t(duration INTERVAL(0) NOT NULL)")
1145 require.NoError(t, err)
1146
1147 result, err := conn.Exec(context.Background(), "insert into t(duration) values($1)", time.Minute)
1148 require.NoError(t, err)
1149
1150 n := result.RowsAffected()
1151 require.EqualValues(t, 1, n)
1152 })
1153 }
1154
View as plain text