1 package pgxpool_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "sync/atomic"
9 "testing"
10 "time"
11
12 "github.com/jackc/pgx/v4"
13 "github.com/jackc/pgx/v4/pgxpool"
14 "github.com/stretchr/testify/assert"
15 "github.com/stretchr/testify/require"
16 )
17
18 func TestConnect(t *testing.T) {
19 t.Parallel()
20 connString := os.Getenv("PGX_TEST_DATABASE")
21 pool, err := pgxpool.Connect(context.Background(), connString)
22 require.NoError(t, err)
23 assert.Equal(t, connString, pool.Config().ConnString())
24 pool.Close()
25 }
26
27 func TestConnectConfig(t *testing.T) {
28 t.Parallel()
29 connString := os.Getenv("PGX_TEST_DATABASE")
30 config, err := pgxpool.ParseConfig(connString)
31 require.NoError(t, err)
32 pool, err := pgxpool.ConnectConfig(context.Background(), config)
33 require.NoError(t, err)
34 assertConfigsEqual(t, config, pool.Config(), "Pool.Config() returns original config")
35 pool.Close()
36 }
37
38 func TestParseConfigExtractsPoolArguments(t *testing.T) {
39 t.Parallel()
40
41 config, err := pgxpool.ParseConfig("pool_max_conns=42 pool_min_conns=1")
42 assert.NoError(t, err)
43 assert.EqualValues(t, 42, config.MaxConns)
44 assert.EqualValues(t, 1, config.MinConns)
45 assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns")
46 assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns")
47 }
48
49 func TestConnectCancel(t *testing.T) {
50 t.Parallel()
51
52 ctx, cancel := context.WithCancel(context.Background())
53 cancel()
54 pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
55 assert.Nil(t, pool)
56 assert.Equal(t, context.Canceled, err)
57 }
58
59 func TestLazyConnect(t *testing.T) {
60 t.Parallel()
61
62 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
63 assert.NoError(t, err)
64 config.LazyConnect = true
65
66 ctx, cancel := context.WithCancel(context.Background())
67 cancel()
68
69 pool, err := pgxpool.ConnectConfig(ctx, config)
70 assert.NoError(t, err)
71
72 _, err = pool.Exec(ctx, "SELECT 1")
73 assert.Equal(t, context.Canceled, err)
74 }
75
76 func TestBeforeConnectWithContextWithValueAndOneMinConn(t *testing.T) {
77 t.Parallel()
78
79 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
80 assert.NoError(t, err)
81 config.MinConns = 1
82 config.BeforeConnect = func(ctx context.Context, config *pgx.ConnConfig) error {
83 val := ctx.Value("key")
84 if val == nil {
85 return errors.New("no value found with key 'key'")
86 }
87 return nil
88 }
89
90 ctx := context.WithValue(context.Background(), "key", "value")
91
92 _, err = pgxpool.ConnectConfig(ctx, config)
93 assert.NoError(t, err)
94 }
95
96 func TestConstructorIgnoresContext(t *testing.T) {
97 t.Parallel()
98
99 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
100 assert.NoError(t, err)
101 config.LazyConnect = true
102 var cancel func()
103 config.BeforeConnect = func(context.Context, *pgx.ConnConfig) error {
104
105
106 cancel()
107 return nil
108 }
109
110 pool, err := pgxpool.ConnectConfig(context.Background(), config)
111 require.NoError(t, err)
112
113 assert.EqualValues(t, 0, pool.Stat().TotalConns())
114
115 var ctx context.Context
116 ctx, cancel = context.WithCancel(context.Background())
117 defer cancel()
118 _, err = pool.Exec(ctx, "SELECT 1")
119 assert.ErrorIs(t, err, context.Canceled)
120 assert.EqualValues(t, 1, pool.Stat().TotalConns())
121 }
122
123 func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
124 t.Parallel()
125
126 config := &pgxpool.Config{}
127
128 require.PanicsWithValue(t, "config must be created by ParseConfig", func() { pgxpool.ConnectConfig(context.Background(), config) })
129 }
130
131 func TestConfigCopyReturnsEqualConfig(t *testing.T) {
132 connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
133 original, err := pgxpool.ParseConfig(connString)
134 require.NoError(t, err)
135
136 copied := original.Copy()
137
138 assertConfigsEqual(t, original, copied, t.Name())
139 }
140
141 func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
142 connString := os.Getenv("PGX_TEST_DATABASE")
143 original, err := pgxpool.ParseConfig(connString)
144 require.NoError(t, err)
145
146 copied := original.Copy()
147 assert.NotPanics(t, func() {
148 _, err = pgxpool.ConnectConfig(context.Background(), copied)
149 })
150 assert.NoError(t, err)
151 }
152
153 func TestPoolAcquireAndConnRelease(t *testing.T) {
154 t.Parallel()
155
156 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
157 require.NoError(t, err)
158 defer pool.Close()
159
160 c, err := pool.Acquire(context.Background())
161 require.NoError(t, err)
162 c.Release()
163 }
164
165 func TestPoolAcquireAndConnHijack(t *testing.T) {
166 t.Parallel()
167
168 ctx := context.Background()
169
170 pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
171 require.NoError(t, err)
172 defer pool.Close()
173
174 c, err := pool.Acquire(ctx)
175 require.NoError(t, err)
176
177 connsBeforeHijack := pool.Stat().TotalConns()
178
179 conn := c.Hijack()
180 defer conn.Close(ctx)
181
182 connsAfterHijack := pool.Stat().TotalConns()
183 require.Equal(t, connsBeforeHijack-1, connsAfterHijack)
184
185 var n int32
186 err = conn.QueryRow(ctx, `select 1`).Scan(&n)
187 require.NoError(t, err)
188 require.Equal(t, int32(1), n)
189 }
190
191 func TestPoolAcquireFunc(t *testing.T) {
192 t.Parallel()
193
194 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
195 require.NoError(t, err)
196 defer pool.Close()
197
198 var n int32
199 err = pool.AcquireFunc(context.Background(), func(c *pgxpool.Conn) error {
200 return c.QueryRow(context.Background(), "select 1").Scan(&n)
201 })
202 require.NoError(t, err)
203 require.EqualValues(t, 1, n)
204 }
205
206 func TestPoolAcquireFuncReturnsFnError(t *testing.T) {
207 t.Parallel()
208
209 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
210 require.NoError(t, err)
211 defer pool.Close()
212
213 err = pool.AcquireFunc(context.Background(), func(c *pgxpool.Conn) error {
214 return fmt.Errorf("some error")
215 })
216 require.EqualError(t, err, "some error")
217 }
218
219 func TestPoolBeforeConnect(t *testing.T) {
220 t.Parallel()
221
222 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
223 require.NoError(t, err)
224
225 config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
226 cfg.Config.RuntimeParams["application_name"] = "pgx"
227 return nil
228 }
229
230 db, err := pgxpool.ConnectConfig(context.Background(), config)
231 require.NoError(t, err)
232 defer db.Close()
233
234 var str string
235 err = db.QueryRow(context.Background(), "SHOW application_name").Scan(&str)
236 require.NoError(t, err)
237 assert.EqualValues(t, "pgx", str)
238 }
239
240 func TestPoolAfterConnect(t *testing.T) {
241 t.Parallel()
242
243 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
244 require.NoError(t, err)
245
246 config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
247 _, err := c.Prepare(ctx, "ps1", "select 1")
248 return err
249 }
250
251 db, err := pgxpool.ConnectConfig(context.Background(), config)
252 require.NoError(t, err)
253 defer db.Close()
254
255 var n int32
256 err = db.QueryRow(context.Background(), "ps1").Scan(&n)
257 require.NoError(t, err)
258 assert.EqualValues(t, 1, n)
259 }
260
261 func TestPoolBeforeAcquire(t *testing.T) {
262 t.Parallel()
263
264 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
265 require.NoError(t, err)
266
267 acquireAttempts := 0
268
269 config.BeforeAcquire = func(ctx context.Context, c *pgx.Conn) bool {
270 acquireAttempts++
271 return acquireAttempts%2 == 0
272 }
273
274 db, err := pgxpool.ConnectConfig(context.Background(), config)
275 require.NoError(t, err)
276 defer db.Close()
277
278 conns := make([]*pgxpool.Conn, 4)
279 for i := range conns {
280 conns[i], err = db.Acquire(context.Background())
281 assert.NoError(t, err)
282 }
283
284 for _, c := range conns {
285 c.Release()
286 }
287 waitForReleaseToComplete()
288
289 assert.EqualValues(t, 8, acquireAttempts)
290
291 conns = db.AcquireAllIdle(context.Background())
292 assert.Len(t, conns, 2)
293
294 for _, c := range conns {
295 c.Release()
296 }
297 waitForReleaseToComplete()
298
299 assert.EqualValues(t, 12, acquireAttempts)
300 }
301
302 func TestPoolAfterRelease(t *testing.T) {
303 t.Parallel()
304
305 func() {
306 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
307 require.NoError(t, err)
308 defer pool.Close()
309
310 err = pool.AcquireFunc(context.Background(), func(conn *pgxpool.Conn) error {
311 if conn.Conn().PgConn().ParameterStatus("crdb_version") != "" {
312 t.Skip("Server does not support backend PID")
313 }
314 return nil
315 })
316 require.NoError(t, err)
317 }()
318
319 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
320 require.NoError(t, err)
321
322 afterReleaseCount := 0
323
324 config.AfterRelease = func(c *pgx.Conn) bool {
325 afterReleaseCount++
326 return afterReleaseCount%2 == 1
327 }
328
329 db, err := pgxpool.ConnectConfig(context.Background(), config)
330 require.NoError(t, err)
331 defer db.Close()
332
333 connPIDs := map[uint32]struct{}{}
334
335 for i := 0; i < 10; i++ {
336 conn, err := db.Acquire(context.Background())
337 assert.NoError(t, err)
338 connPIDs[conn.Conn().PgConn().PID()] = struct{}{}
339 conn.Release()
340 waitForReleaseToComplete()
341 }
342
343 assert.EqualValues(t, 5, len(connPIDs))
344 }
345
346 func TestPoolAcquireAllIdle(t *testing.T) {
347 t.Parallel()
348
349 db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
350 require.NoError(t, err)
351 defer db.Close()
352
353 conns := db.AcquireAllIdle(context.Background())
354 assert.Len(t, conns, 1)
355
356 for _, c := range conns {
357 c.Release()
358 }
359 waitForReleaseToComplete()
360
361 conns = make([]*pgxpool.Conn, 3)
362 for i := range conns {
363 conns[i], err = db.Acquire(context.Background())
364 assert.NoError(t, err)
365 }
366
367 for _, c := range conns {
368 if c != nil {
369 c.Release()
370 }
371 }
372 waitForReleaseToComplete()
373
374 conns = db.AcquireAllIdle(context.Background())
375 assert.Len(t, conns, 3)
376
377 for _, c := range conns {
378 c.Release()
379 }
380 }
381
382 func TestConnReleaseChecksMaxConnLifetime(t *testing.T) {
383 t.Parallel()
384
385 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
386 require.NoError(t, err)
387
388 config.MaxConnLifetime = 250 * time.Millisecond
389
390 db, err := pgxpool.ConnectConfig(context.Background(), config)
391 require.NoError(t, err)
392 defer db.Close()
393
394 c, err := db.Acquire(context.Background())
395 require.NoError(t, err)
396
397 time.Sleep(config.MaxConnLifetime)
398
399 c.Release()
400 waitForReleaseToComplete()
401
402 stats := db.Stat()
403 assert.EqualValues(t, 0, stats.TotalConns())
404 }
405
406 func TestConnReleaseClosesBusyConn(t *testing.T) {
407 t.Parallel()
408
409 db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
410 require.NoError(t, err)
411 defer db.Close()
412
413 c, err := db.Acquire(context.Background())
414 require.NoError(t, err)
415
416 _, err = c.Query(context.Background(), "select generate_series(1,10)")
417 require.NoError(t, err)
418
419 c.Release()
420 waitForReleaseToComplete()
421
422
423 for i := 0; i < 1000; i++ {
424 if db.Stat().TotalConns() == 0 {
425 break
426 }
427 time.Sleep(time.Millisecond)
428 }
429
430 stats := db.Stat()
431 assert.EqualValues(t, 0, stats.TotalConns())
432 }
433
434 func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) {
435 t.Parallel()
436
437 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
438 require.NoError(t, err)
439
440 config.MaxConnLifetime = 100 * time.Millisecond
441 config.HealthCheckPeriod = 100 * time.Millisecond
442
443 db, err := pgxpool.ConnectConfig(context.Background(), config)
444 require.NoError(t, err)
445 defer db.Close()
446
447 c, err := db.Acquire(context.Background())
448 require.NoError(t, err)
449 c.Release()
450 time.Sleep(config.MaxConnLifetime + 500*time.Millisecond)
451
452 stats := db.Stat()
453 assert.EqualValues(t, 0, stats.TotalConns())
454 assert.EqualValues(t, 0, stats.MaxIdleDestroyCount())
455 assert.EqualValues(t, 1, stats.MaxLifetimeDestroyCount())
456 }
457
458 func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) {
459 t.Parallel()
460
461 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
462 require.NoError(t, err)
463
464 config.MaxConnLifetime = 1 * time.Minute
465 config.MaxConnIdleTime = 100 * time.Millisecond
466 config.HealthCheckPeriod = 150 * time.Millisecond
467
468 db, err := pgxpool.ConnectConfig(context.Background(), config)
469 require.NoError(t, err)
470 defer db.Close()
471
472 c, err := db.Acquire(context.Background())
473 require.NoError(t, err)
474 c.Release()
475 time.Sleep(config.HealthCheckPeriod)
476
477 for i := 0; i < 1000; i++ {
478 if db.Stat().TotalConns() == 0 {
479 break
480 }
481 time.Sleep(time.Millisecond)
482 }
483
484 stats := db.Stat()
485 assert.EqualValues(t, 0, stats.TotalConns())
486 assert.EqualValues(t, 1, stats.MaxIdleDestroyCount())
487 assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
488 }
489
490 func TestPoolBackgroundChecksMinConns(t *testing.T) {
491 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
492 require.NoError(t, err)
493
494 config.HealthCheckPeriod = 100 * time.Millisecond
495 config.MinConns = 2
496
497 db, err := pgxpool.ConnectConfig(context.Background(), config)
498 require.NoError(t, err)
499 defer db.Close()
500
501 time.Sleep(config.HealthCheckPeriod + 500*time.Millisecond)
502
503 stats := db.Stat()
504 assert.EqualValues(t, 2, stats.TotalConns())
505 assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
506 assert.EqualValues(t, 2, stats.NewConnsCount())
507
508 c, err := db.Acquire(context.Background())
509 require.NoError(t, err)
510 err = c.Conn().Close(context.Background())
511 require.NoError(t, err)
512 c.Release()
513
514 time.Sleep(config.HealthCheckPeriod + 500*time.Millisecond)
515
516 stats = db.Stat()
517 assert.EqualValues(t, 2, stats.TotalConns())
518 assert.EqualValues(t, 0, stats.MaxIdleDestroyCount())
519 assert.EqualValues(t, 3, stats.NewConnsCount())
520 }
521
522 func TestPoolExec(t *testing.T) {
523 t.Parallel()
524
525 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
526 require.NoError(t, err)
527 defer pool.Close()
528
529 testExec(t, pool)
530 }
531
532 func TestPoolQuery(t *testing.T) {
533 t.Parallel()
534
535 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
536 require.NoError(t, err)
537 defer pool.Close()
538
539
540 testQuery(t, pool)
541 waitForReleaseToComplete()
542
543
544 rows, err := pool.Query(context.Background(), "select generate_series(1,$1)", 10)
545 require.NoError(t, err)
546
547 stats := pool.Stat()
548 assert.EqualValues(t, 1, stats.AcquiredConns())
549 assert.EqualValues(t, 1, stats.TotalConns())
550
551 rows.Close()
552 assert.NoError(t, rows.Err())
553 waitForReleaseToComplete()
554
555 stats = pool.Stat()
556 assert.EqualValues(t, 0, stats.AcquiredConns())
557 assert.EqualValues(t, 1, stats.TotalConns())
558
559 }
560
561 func TestPoolQueryRow(t *testing.T) {
562 t.Parallel()
563
564 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
565 require.NoError(t, err)
566 defer pool.Close()
567
568 testQueryRow(t, pool)
569 waitForReleaseToComplete()
570
571 stats := pool.Stat()
572 assert.EqualValues(t, 0, stats.AcquiredConns())
573 assert.EqualValues(t, 1, stats.TotalConns())
574 }
575
576
577 func TestPoolQueryRowErrNoRows(t *testing.T) {
578 t.Parallel()
579
580 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
581 require.NoError(t, err)
582 defer pool.Close()
583
584 err = pool.QueryRow(context.Background(), "select n from generate_series(1,10) n where n=0").Scan(nil)
585 require.Equal(t, pgx.ErrNoRows, err)
586 }
587
588 func TestPoolSendBatch(t *testing.T) {
589 t.Parallel()
590
591 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
592 require.NoError(t, err)
593 defer pool.Close()
594
595 testSendBatch(t, pool)
596 waitForReleaseToComplete()
597
598 stats := pool.Stat()
599 assert.EqualValues(t, 0, stats.AcquiredConns())
600 assert.EqualValues(t, 1, stats.TotalConns())
601 }
602
603 func TestPoolCopyFrom(t *testing.T) {
604
605
606 t.Parallel()
607
608 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
609 defer cancel()
610
611 pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
612 require.NoError(t, err)
613 defer pool.Close()
614
615 _, err = pool.Exec(ctx, `drop table if exists poolcopyfromtest`)
616 require.NoError(t, err)
617
618 _, err = pool.Exec(ctx, `create table poolcopyfromtest(a int2, b int4, c int8, d varchar, e text, f date, g timestamptz)`)
619 require.NoError(t, err)
620 defer pool.Exec(ctx, `drop table poolcopyfromtest`)
621
622 tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
623
624 inputRows := [][]interface{}{
625 {int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
626 {nil, nil, nil, nil, nil, nil, nil},
627 }
628
629 copyCount, err := pool.CopyFrom(ctx, pgx.Identifier{"poolcopyfromtest"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
630 assert.NoError(t, err)
631 assert.EqualValues(t, len(inputRows), copyCount)
632
633 rows, err := pool.Query(ctx, "select * from poolcopyfromtest")
634 assert.NoError(t, err)
635
636 var outputRows [][]interface{}
637 for rows.Next() {
638 row, err := rows.Values()
639 if err != nil {
640 t.Errorf("Unexpected error for rows.Values(): %v", err)
641 }
642 outputRows = append(outputRows, row)
643 }
644
645 assert.NoError(t, rows.Err())
646 assert.Equal(t, inputRows, outputRows)
647 }
648
649 func TestConnReleaseClosesConnInFailedTransaction(t *testing.T) {
650 t.Parallel()
651
652 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
653 defer cancel()
654 pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
655 require.NoError(t, err)
656 defer pool.Close()
657
658 err = pool.AcquireFunc(ctx, func(conn *pgxpool.Conn) error {
659 if conn.Conn().PgConn().ParameterStatus("crdb_version") != "" {
660 t.Skip("Server does not support backend PID")
661 }
662 return nil
663 })
664 require.NoError(t, err)
665
666 c, err := pool.Acquire(ctx)
667 require.NoError(t, err)
668
669 pid := c.Conn().PgConn().PID()
670
671 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
672
673 _, err = c.Exec(ctx, "begin")
674 assert.NoError(t, err)
675
676 assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
677
678 _, err = c.Exec(ctx, "selct")
679 assert.Error(t, err)
680
681 assert.Equal(t, byte('E'), c.Conn().PgConn().TxStatus())
682
683 c.Release()
684 waitForReleaseToComplete()
685
686 c, err = pool.Acquire(ctx)
687 require.NoError(t, err)
688
689 assert.NotEqual(t, pid, c.Conn().PgConn().PID())
690 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
691
692 c.Release()
693 }
694
695 func TestConnReleaseClosesConnInTransaction(t *testing.T) {
696 t.Parallel()
697
698 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
699 defer cancel()
700 pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
701 require.NoError(t, err)
702 defer pool.Close()
703
704 err = pool.AcquireFunc(ctx, func(conn *pgxpool.Conn) error {
705 if conn.Conn().PgConn().ParameterStatus("crdb_version") != "" {
706 t.Skip("Server does not support backend PID")
707 }
708 return nil
709 })
710 require.NoError(t, err)
711
712 c, err := pool.Acquire(ctx)
713 require.NoError(t, err)
714
715 pid := c.Conn().PgConn().PID()
716
717 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
718
719 _, err = c.Exec(ctx, "begin")
720 assert.NoError(t, err)
721
722 assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
723
724 c.Release()
725 waitForReleaseToComplete()
726
727 c, err = pool.Acquire(ctx)
728 require.NoError(t, err)
729
730 assert.NotEqual(t, pid, c.Conn().PgConn().PID())
731 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
732
733 c.Release()
734 }
735
736 func TestConnReleaseDestroysClosedConn(t *testing.T) {
737 t.Parallel()
738
739 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
740 defer cancel()
741 pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
742 require.NoError(t, err)
743 defer pool.Close()
744
745 c, err := pool.Acquire(ctx)
746 require.NoError(t, err)
747
748 err = c.Conn().Close(ctx)
749 require.NoError(t, err)
750
751 assert.EqualValues(t, 1, pool.Stat().TotalConns())
752
753 c.Release()
754 waitForReleaseToComplete()
755
756
757 for i := 0; i < 1000; i++ {
758 if pool.Stat().TotalConns() == 0 {
759 break
760 }
761 time.Sleep(time.Millisecond)
762 }
763
764 assert.EqualValues(t, 0, pool.Stat().TotalConns())
765 }
766
767 func TestConnPoolQueryConcurrentLoad(t *testing.T) {
768 t.Parallel()
769
770 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
771 require.NoError(t, err)
772 defer pool.Close()
773
774 n := 100
775 done := make(chan bool)
776
777 for i := 0; i < n; i++ {
778 go func() {
779 defer func() { done <- true }()
780 testQuery(t, pool)
781 testQueryRow(t, pool)
782 }()
783 }
784
785 for i := 0; i < n; i++ {
786 <-done
787 }
788 }
789
790 func TestConnReleaseWhenBeginFail(t *testing.T) {
791 t.Parallel()
792
793 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
794 defer cancel()
795
796 db, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
797 require.NoError(t, err)
798 defer db.Close()
799
800 tx, err := db.BeginTx(ctx, pgx.TxOptions{
801 IsoLevel: pgx.TxIsoLevel("foo"),
802 })
803 assert.Error(t, err)
804 if !assert.Zero(t, tx) {
805 err := tx.Rollback(ctx)
806 assert.NoError(t, err)
807 }
808
809 for i := 0; i < 1000; i++ {
810 if db.Stat().TotalConns() == 0 {
811 break
812 }
813 time.Sleep(time.Millisecond)
814 }
815
816 assert.EqualValues(t, 0, db.Stat().TotalConns())
817 }
818
819 func TestTxBeginFuncNestedTransactionCommit(t *testing.T) {
820 db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
821 require.NoError(t, err)
822 defer db.Close()
823
824 createSql := `
825 drop table if exists pgxpooltx;
826 create temporary table pgxpooltx(
827 id integer,
828 unique (id)
829 );
830 `
831
832 _, err = db.Exec(context.Background(), createSql)
833 require.NoError(t, err)
834
835 defer func() {
836 db.Exec(context.Background(), "drop table pgxpooltx")
837 }()
838
839 err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
840 _, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (1)")
841 require.NoError(t, err)
842
843 err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
844 _, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (2)")
845 require.NoError(t, err)
846
847 err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
848 _, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (3)")
849 require.NoError(t, err)
850 return nil
851 })
852 require.NoError(t, err)
853 return nil
854 })
855 require.NoError(t, err)
856 return nil
857 })
858 require.NoError(t, err)
859
860 var n int64
861 err = db.QueryRow(context.Background(), "select count(*) from pgxpooltx").Scan(&n)
862 require.NoError(t, err)
863 require.EqualValues(t, 3, n)
864 }
865
866 func TestTxBeginFuncNestedTransactionRollback(t *testing.T) {
867 db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
868 require.NoError(t, err)
869 defer db.Close()
870
871 createSql := `
872 drop table if exists pgxpooltx;
873 create temporary table pgxpooltx(
874 id integer,
875 unique (id)
876 );
877 `
878
879 _, err = db.Exec(context.Background(), createSql)
880 require.NoError(t, err)
881
882 defer func() {
883 db.Exec(context.Background(), "drop table pgxpooltx")
884 }()
885
886 err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
887 _, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (1)")
888 require.NoError(t, err)
889
890 err = db.BeginFunc(context.Background(), func(db pgx.Tx) error {
891 _, err := db.Exec(context.Background(), "insert into pgxpooltx(id) values (2)")
892 require.NoError(t, err)
893 return errors.New("do a rollback")
894 })
895 require.EqualError(t, err, "do a rollback")
896
897 _, err = db.Exec(context.Background(), "insert into pgxpooltx(id) values (3)")
898 require.NoError(t, err)
899
900 return nil
901 })
902 require.NoError(t, err)
903
904 var n int64
905 err = db.QueryRow(context.Background(), "select count(*) from pgxpooltx").Scan(&n)
906 require.NoError(t, err)
907 require.EqualValues(t, 2, n)
908 }
909
910 func TestIdempotentPoolClose(t *testing.T) {
911 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
912 require.NoError(t, err)
913
914
915 require.NotPanics(t, func() { pool.Close() })
916
917
918 require.NotPanics(t, func() { pool.Close() })
919 }
920
921 func TestConnectCreatesMinPool(t *testing.T) {
922 t.Parallel()
923
924 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
925 require.NoError(t, err)
926
927 config.MinConns = int32(12)
928 config.MaxConns = int32(15)
929 config.LazyConnect = false
930
931 acquireAttempts := int64(0)
932 connectAttempts := int64(0)
933
934 config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
935 atomic.AddInt64(&acquireAttempts, 1)
936 return true
937 }
938 config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
939 atomic.AddInt64(&connectAttempts, 1)
940 return nil
941 }
942
943 pool, err := pgxpool.ConnectConfig(context.Background(), config)
944 require.NoError(t, err)
945 defer pool.Close()
946
947 stat := pool.Stat()
948 require.Equal(t, int32(12), stat.IdleConns())
949 require.Equal(t, int64(1), stat.AcquireCount())
950 require.Equal(t, int32(12), stat.TotalConns())
951 require.Equal(t, int64(0), acquireAttempts)
952 require.Equal(t, int64(12), connectAttempts)
953 }
954 func TestConnectSkipMinPoolWithLazy(t *testing.T) {
955 t.Parallel()
956
957 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
958 require.NoError(t, err)
959
960 config.MinConns = int32(12)
961 config.MaxConns = int32(15)
962 config.LazyConnect = true
963
964 acquireAttempts := int64(0)
965 connectAttempts := int64(0)
966
967 config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
968 atomic.AddInt64(&acquireAttempts, 1)
969 return true
970 }
971 config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
972 atomic.AddInt64(&connectAttempts, 1)
973 return nil
974 }
975
976 pool, err := pgxpool.ConnectConfig(context.Background(), config)
977 require.NoError(t, err)
978 defer pool.Close()
979
980 stat := pool.Stat()
981 require.Equal(t, int32(0), stat.IdleConns())
982 require.Equal(t, int64(0), stat.AcquireCount())
983 require.Equal(t, int32(0), stat.TotalConns())
984 require.Equal(t, int64(0), acquireAttempts)
985 require.Equal(t, int64(0), connectAttempts)
986 }
987
988 func TestConnectMinPoolZero(t *testing.T) {
989 t.Parallel()
990
991 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
992 require.NoError(t, err)
993
994 config.MinConns = int32(0)
995 config.MaxConns = int32(15)
996 config.LazyConnect = false
997
998 acquireAttempts := int64(0)
999 connectAttempts := int64(0)
1000
1001 config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
1002 atomic.AddInt64(&acquireAttempts, 1)
1003 return true
1004 }
1005 config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
1006 atomic.AddInt64(&connectAttempts, 1)
1007 return nil
1008 }
1009
1010 pool, err := pgxpool.ConnectConfig(context.Background(), config)
1011 require.NoError(t, err)
1012 defer pool.Close()
1013
1014 stat := pool.Stat()
1015 require.Equal(t, int32(1), stat.IdleConns())
1016 require.Equal(t, int64(1), stat.AcquireCount())
1017 require.Equal(t, int32(1), stat.TotalConns())
1018 require.Equal(t, int64(0), acquireAttempts)
1019 require.Equal(t, int64(1), connectAttempts)
1020 }
1021
1022 func TestCreateMinPoolClosesConnectionsOnError(t *testing.T) {
1023 t.Parallel()
1024
1025 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
1026 require.NoError(t, err)
1027
1028 config.MinConns = int32(12)
1029 config.MaxConns = int32(15)
1030 config.LazyConnect = false
1031
1032 acquireAttempts := int64(0)
1033 madeConnections := int64(0)
1034 conns := make(chan *pgx.Conn, 15)
1035
1036 config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
1037 atomic.AddInt64(&acquireAttempts, 1)
1038 return true
1039 }
1040 config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
1041 conns <- conn
1042
1043 atomic.AddInt64(&madeConnections, 1)
1044 mc := atomic.LoadInt64(&madeConnections)
1045 if mc == 10 {
1046 return errors.New("mock error")
1047 }
1048 return nil
1049 }
1050 pool, err := pgxpool.ConnectConfig(context.Background(), config)
1051 require.Error(t, err)
1052 require.Nil(t, pool)
1053
1054 close(conns)
1055 for conn := range conns {
1056 require.True(t, conn.IsClosed())
1057 }
1058
1059 require.Equal(t, int64(0), acquireAttempts)
1060 require.True(t, madeConnections >= 10, "Expected %d got %d", 10, madeConnections)
1061 }
1062
1063 func TestCreateMinPoolReturnsFirstError(t *testing.T) {
1064 t.Parallel()
1065
1066 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
1067 require.NoError(t, err)
1068
1069 config.MinConns = int32(12)
1070 config.MaxConns = int32(15)
1071 config.LazyConnect = false
1072
1073 acquireAttempts := int64(0)
1074 connectAttempts := int64(0)
1075
1076 mockErr := errors.New("mock connect error")
1077
1078 config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
1079 atomic.AddInt64(&acquireAttempts, 1)
1080 return true
1081 }
1082 config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
1083 atomic.AddInt64(&connectAttempts, 1)
1084 ca := atomic.LoadInt64(&connectAttempts)
1085 if ca >= 5 {
1086 return mockErr
1087 }
1088 return nil
1089 }
1090
1091 pool, err := pgxpool.ConnectConfig(context.Background(), config)
1092 require.Nil(t, pool)
1093 require.Error(t, err)
1094
1095 require.True(t, connectAttempts >= 5, "Expected %d got %d", 5, connectAttempts)
1096 require.ErrorIs(t, err, mockErr)
1097 }
1098
1099 func TestPoolSendBatchBatchCloseTwice(t *testing.T) {
1100 t.Parallel()
1101
1102 pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
1103 require.NoError(t, err)
1104 defer pool.Close()
1105
1106 errChan := make(chan error)
1107 testCount := 5000
1108
1109 for i := 0; i < testCount; i++ {
1110 go func() {
1111 batch := &pgx.Batch{}
1112 batch.Queue("select 1")
1113 batch.Queue("select 2")
1114
1115 br := pool.SendBatch(context.Background(), batch)
1116 defer br.Close()
1117
1118 var err error
1119 var n int32
1120 err = br.QueryRow().Scan(&n)
1121 if err != nil {
1122 errChan <- err
1123 return
1124 }
1125 if n != 1 {
1126 errChan <- fmt.Errorf("expected 1 got %v", n)
1127 return
1128 }
1129
1130 err = br.QueryRow().Scan(&n)
1131 if err != nil {
1132 errChan <- err
1133 return
1134 }
1135 if n != 2 {
1136 errChan <- fmt.Errorf("expected 2 got %v", n)
1137 return
1138 }
1139
1140 err = br.Close()
1141 errChan <- err
1142 }()
1143 }
1144
1145 for i := 0; i < testCount; i++ {
1146 err := <-errChan
1147 assert.NoError(t, err)
1148 }
1149 }
1150
View as plain text