1 package pgxpool_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "sync/atomic"
9 "testing"
10 "time"
11
12 "github.com/jackc/pgx/v5"
13 "github.com/jackc/pgx/v5/pgxpool"
14 "github.com/jackc/pgx/v5/pgxtest"
15 "github.com/stretchr/testify/assert"
16 "github.com/stretchr/testify/require"
17 )
18
19 func TestConnect(t *testing.T) {
20 t.Parallel()
21 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
22 defer cancel()
23 connString := os.Getenv("PGX_TEST_DATABASE")
24 pool, err := pgxpool.New(ctx, connString)
25 require.NoError(t, err)
26 assert.Equal(t, connString, pool.Config().ConnString())
27 pool.Close()
28 }
29
30 func TestConnectConfig(t *testing.T) {
31 t.Parallel()
32 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
33 defer cancel()
34 connString := os.Getenv("PGX_TEST_DATABASE")
35 config, err := pgxpool.ParseConfig(connString)
36 require.NoError(t, err)
37 pool, err := pgxpool.NewWithConfig(ctx, config)
38 require.NoError(t, err)
39 assertConfigsEqual(t, config, pool.Config(), "Pool.Config() returns original config")
40 pool.Close()
41 }
42
43 func TestParseConfigExtractsPoolArguments(t *testing.T) {
44 t.Parallel()
45
46 config, err := pgxpool.ParseConfig("pool_max_conns=42 pool_min_conns=1")
47 assert.NoError(t, err)
48 assert.EqualValues(t, 42, config.MaxConns)
49 assert.EqualValues(t, 1, config.MinConns)
50 assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns")
51 assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns")
52 }
53
54 func TestConstructorIgnoresContext(t *testing.T) {
55 t.Parallel()
56
57 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
58 assert.NoError(t, err)
59 var cancel func()
60 config.BeforeConnect = func(context.Context, *pgx.ConnConfig) error {
61
62
63 cancel()
64 return nil
65 }
66
67 pool, err := pgxpool.NewWithConfig(context.Background(), config)
68 require.NoError(t, err)
69
70 assert.EqualValues(t, 0, pool.Stat().TotalConns())
71
72 var ctx context.Context
73 ctx, cancel = context.WithCancel(context.Background())
74 defer cancel()
75 _, err = pool.Exec(ctx, "SELECT 1")
76 assert.ErrorIs(t, err, context.Canceled)
77 assert.EqualValues(t, 1, pool.Stat().TotalConns())
78 }
79
80 func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
81 t.Parallel()
82
83 config := &pgxpool.Config{}
84
85 require.PanicsWithValue(t, "config must be created by ParseConfig", func() { pgxpool.NewWithConfig(context.Background(), config) })
86 }
87
88 func TestConfigCopyReturnsEqualConfig(t *testing.T) {
89 connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
90 original, err := pgxpool.ParseConfig(connString)
91 require.NoError(t, err)
92
93 copied := original.Copy()
94
95 assertConfigsEqual(t, original, copied, t.Name())
96 }
97
98 func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
99 connString := os.Getenv("PGX_TEST_DATABASE")
100 original, err := pgxpool.ParseConfig(connString)
101 require.NoError(t, err)
102
103 copied := original.Copy()
104 assert.NotPanics(t, func() {
105 _, err = pgxpool.NewWithConfig(context.Background(), copied)
106 })
107 assert.NoError(t, err)
108 }
109
110 func TestPoolAcquireAndConnRelease(t *testing.T) {
111 t.Parallel()
112
113 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
114 defer cancel()
115
116 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
117 require.NoError(t, err)
118 defer pool.Close()
119
120 c, err := pool.Acquire(ctx)
121 require.NoError(t, err)
122 c.Release()
123 }
124
125 func TestPoolAcquireAndConnHijack(t *testing.T) {
126 t.Parallel()
127
128 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
129 defer cancel()
130
131 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
132 require.NoError(t, err)
133 defer pool.Close()
134
135 c, err := pool.Acquire(ctx)
136 require.NoError(t, err)
137
138 connsBeforeHijack := pool.Stat().TotalConns()
139
140 conn := c.Hijack()
141 defer conn.Close(ctx)
142
143 connsAfterHijack := pool.Stat().TotalConns()
144 require.Equal(t, connsBeforeHijack-1, connsAfterHijack)
145
146 var n int32
147 err = conn.QueryRow(ctx, `select 1`).Scan(&n)
148 require.NoError(t, err)
149 require.Equal(t, int32(1), n)
150 }
151
152 func TestPoolAcquireChecksIdleConns(t *testing.T) {
153 t.Parallel()
154
155 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
156 defer cancel()
157
158 controllerConn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
159 require.NoError(t, err)
160 defer controllerConn.Close(ctx)
161 pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
162
163 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
164 require.NoError(t, err)
165 defer pool.Close()
166
167 var conns []*pgxpool.Conn
168 for i := 0; i < 3; i++ {
169 c, err := pool.Acquire(ctx)
170 require.NoError(t, err)
171 conns = append(conns, c)
172 }
173
174 require.EqualValues(t, 3, pool.Stat().TotalConns())
175
176 var pids []uint32
177 for _, c := range conns {
178 pids = append(pids, c.Conn().PgConn().PID())
179 c.Release()
180 }
181
182 _, err = controllerConn.Exec(ctx, `select pg_terminate_backend(n) from unnest($1::int[]) n`, pids)
183 require.NoError(t, err)
184
185
186 require.EqualValues(t, 3, pool.Stat().TotalConns())
187
188
189 time.Sleep(time.Second)
190
191
192 err = pool.Ping(ctx)
193 require.NoError(t, err)
194
195
196 require.EqualValues(t, 1, pool.Stat().TotalConns())
197 c, err := pool.Acquire(ctx)
198 require.NoError(t, err)
199
200 cPID := c.Conn().PgConn().PID()
201 c.Release()
202
203 require.NotContains(t, pids, cPID)
204 }
205
206 func TestPoolAcquireFunc(t *testing.T) {
207 t.Parallel()
208
209 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
210 defer cancel()
211
212 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
213 require.NoError(t, err)
214 defer pool.Close()
215
216 var n int32
217 err = pool.AcquireFunc(ctx, func(c *pgxpool.Conn) error {
218 return c.QueryRow(ctx, "select 1").Scan(&n)
219 })
220 require.NoError(t, err)
221 require.EqualValues(t, 1, n)
222 }
223
224 func TestPoolAcquireFuncReturnsFnError(t *testing.T) {
225 t.Parallel()
226
227 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
228 defer cancel()
229
230 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
231 require.NoError(t, err)
232 defer pool.Close()
233
234 err = pool.AcquireFunc(ctx, func(c *pgxpool.Conn) error {
235 return fmt.Errorf("some error")
236 })
237 require.EqualError(t, err, "some error")
238 }
239
240 func TestPoolBeforeConnect(t *testing.T) {
241 t.Parallel()
242
243 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
244 defer cancel()
245
246 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
247 require.NoError(t, err)
248
249 config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
250 cfg.Config.RuntimeParams["application_name"] = "pgx"
251 return nil
252 }
253
254 db, err := pgxpool.NewWithConfig(ctx, config)
255 require.NoError(t, err)
256 defer db.Close()
257
258 var str string
259 err = db.QueryRow(ctx, "SHOW application_name").Scan(&str)
260 require.NoError(t, err)
261 assert.EqualValues(t, "pgx", str)
262 }
263
264 func TestPoolAfterConnect(t *testing.T) {
265 t.Parallel()
266
267 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
268 defer cancel()
269
270 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
271 require.NoError(t, err)
272
273 config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
274 _, err := c.Prepare(ctx, "ps1", "select 1")
275 return err
276 }
277
278 db, err := pgxpool.NewWithConfig(ctx, config)
279 require.NoError(t, err)
280 defer db.Close()
281
282 var n int32
283 err = db.QueryRow(ctx, "ps1").Scan(&n)
284 require.NoError(t, err)
285 assert.EqualValues(t, 1, n)
286 }
287
288 func TestPoolBeforeAcquire(t *testing.T) {
289 t.Parallel()
290
291 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
292 defer cancel()
293
294 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
295 require.NoError(t, err)
296
297 acquireAttempts := 0
298
299 config.BeforeAcquire = func(ctx context.Context, c *pgx.Conn) bool {
300 acquireAttempts++
301 return acquireAttempts%2 == 0
302 }
303
304 db, err := pgxpool.NewWithConfig(ctx, config)
305 require.NoError(t, err)
306 defer db.Close()
307
308 conns := make([]*pgxpool.Conn, 4)
309 for i := range conns {
310 conns[i], err = db.Acquire(ctx)
311 assert.NoError(t, err)
312 }
313
314 for _, c := range conns {
315 c.Release()
316 }
317 waitForReleaseToComplete()
318
319 assert.EqualValues(t, 8, acquireAttempts)
320
321 conns = db.AcquireAllIdle(ctx)
322 assert.Len(t, conns, 2)
323
324 for _, c := range conns {
325 c.Release()
326 }
327 waitForReleaseToComplete()
328
329 assert.EqualValues(t, 12, acquireAttempts)
330 }
331
332 func TestPoolAfterRelease(t *testing.T) {
333 t.Parallel()
334
335 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
336 defer cancel()
337
338 func() {
339 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
340 require.NoError(t, err)
341 defer pool.Close()
342 }()
343
344 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
345 require.NoError(t, err)
346
347 afterReleaseCount := 0
348
349 config.AfterRelease = func(c *pgx.Conn) bool {
350 afterReleaseCount++
351 return afterReleaseCount%2 == 1
352 }
353
354 db, err := pgxpool.NewWithConfig(ctx, config)
355 require.NoError(t, err)
356 defer db.Close()
357
358 connPIDs := map[uint32]struct{}{}
359
360 for i := 0; i < 10; i++ {
361 conn, err := db.Acquire(ctx)
362 assert.NoError(t, err)
363 connPIDs[conn.Conn().PgConn().PID()] = struct{}{}
364 conn.Release()
365 waitForReleaseToComplete()
366 }
367
368 assert.EqualValues(t, 5, len(connPIDs))
369 }
370
371 func TestPoolBeforeClose(t *testing.T) {
372 t.Parallel()
373
374 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
375 defer cancel()
376
377 func() {
378 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
379 require.NoError(t, err)
380 defer pool.Close()
381 }()
382
383 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
384 require.NoError(t, err)
385
386 connPIDs := make(chan uint32, 5)
387 config.BeforeClose = func(c *pgx.Conn) {
388 connPIDs <- c.PgConn().PID()
389 }
390
391 db, err := pgxpool.NewWithConfig(ctx, config)
392 require.NoError(t, err)
393 defer db.Close()
394
395 acquiredPIDs := make([]uint32, 0, 5)
396 closedPIDs := make([]uint32, 0, 5)
397 for i := 0; i < 5; i++ {
398 conn, err := db.Acquire(ctx)
399 assert.NoError(t, err)
400 acquiredPIDs = append(acquiredPIDs, conn.Conn().PgConn().PID())
401 conn.Release()
402 db.Reset()
403 closedPIDs = append(closedPIDs, <-connPIDs)
404 }
405
406 assert.ElementsMatch(t, acquiredPIDs, closedPIDs)
407 }
408
409 func TestPoolAcquireAllIdle(t *testing.T) {
410 t.Parallel()
411
412 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
413 defer cancel()
414
415 db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
416 require.NoError(t, err)
417 defer db.Close()
418
419 conns := make([]*pgxpool.Conn, 3)
420 for i := range conns {
421 conns[i], err = db.Acquire(ctx)
422 assert.NoError(t, err)
423 }
424
425 for _, c := range conns {
426 if c != nil {
427 c.Release()
428 }
429 }
430 waitForReleaseToComplete()
431
432 conns = db.AcquireAllIdle(ctx)
433 assert.Len(t, conns, 3)
434
435 for _, c := range conns {
436 c.Release()
437 }
438 }
439
440 func TestPoolReset(t *testing.T) {
441 t.Parallel()
442
443 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
444 defer cancel()
445
446 db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
447 require.NoError(t, err)
448 defer db.Close()
449
450 conns := make([]*pgxpool.Conn, 3)
451 for i := range conns {
452 conns[i], err = db.Acquire(ctx)
453 assert.NoError(t, err)
454 }
455
456 db.Reset()
457
458 for _, c := range conns {
459 if c != nil {
460 c.Release()
461 }
462 }
463 waitForReleaseToComplete()
464
465 require.EqualValues(t, 0, db.Stat().TotalConns())
466 }
467
468 func TestConnReleaseChecksMaxConnLifetime(t *testing.T) {
469 t.Parallel()
470
471 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
472 defer cancel()
473
474 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
475 require.NoError(t, err)
476
477 config.MaxConnLifetime = 250 * time.Millisecond
478
479 db, err := pgxpool.NewWithConfig(ctx, config)
480 require.NoError(t, err)
481 defer db.Close()
482
483 c, err := db.Acquire(ctx)
484 require.NoError(t, err)
485
486 time.Sleep(config.MaxConnLifetime)
487
488 c.Release()
489 waitForReleaseToComplete()
490
491 stats := db.Stat()
492 assert.EqualValues(t, 0, stats.TotalConns())
493 }
494
495 func TestConnReleaseClosesBusyConn(t *testing.T) {
496 t.Parallel()
497
498 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
499 defer cancel()
500
501 db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
502 require.NoError(t, err)
503 defer db.Close()
504
505 c, err := db.Acquire(ctx)
506 require.NoError(t, err)
507
508 _, err = c.Query(ctx, "select generate_series(1,10)")
509 require.NoError(t, err)
510
511 c.Release()
512 waitForReleaseToComplete()
513
514
515 for i := 0; i < 1000; i++ {
516 if db.Stat().TotalConns() == 0 {
517 break
518 }
519 time.Sleep(time.Millisecond)
520 }
521
522 stats := db.Stat()
523 assert.EqualValues(t, 0, stats.TotalConns())
524 }
525
526 func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) {
527 t.Parallel()
528
529 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
530 defer cancel()
531
532 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
533 require.NoError(t, err)
534
535 config.MaxConnLifetime = 100 * time.Millisecond
536 config.HealthCheckPeriod = 100 * time.Millisecond
537
538 db, err := pgxpool.NewWithConfig(ctx, config)
539 require.NoError(t, err)
540 defer db.Close()
541
542 c, err := db.Acquire(ctx)
543 require.NoError(t, err)
544 c.Release()
545 time.Sleep(config.MaxConnLifetime + 500*time.Millisecond)
546
547 stats := db.Stat()
548 assert.EqualValues(t, 0, stats.TotalConns())
549 assert.EqualValues(t, 0, stats.MaxIdleDestroyCount())
550 assert.EqualValues(t, 1, stats.MaxLifetimeDestroyCount())
551 assert.EqualValues(t, 1, stats.NewConnsCount())
552 }
553
554 func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) {
555 t.Parallel()
556
557 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
558 defer cancel()
559
560 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
561 require.NoError(t, err)
562
563 config.MaxConnLifetime = 1 * time.Minute
564 config.MaxConnIdleTime = 100 * time.Millisecond
565 config.HealthCheckPeriod = 150 * time.Millisecond
566
567 db, err := pgxpool.NewWithConfig(ctx, config)
568 require.NoError(t, err)
569 defer db.Close()
570
571 c, err := db.Acquire(ctx)
572 require.NoError(t, err)
573 c.Release()
574 time.Sleep(config.HealthCheckPeriod)
575
576 for i := 0; i < 1000; i++ {
577 if db.Stat().TotalConns() == 0 {
578 break
579 }
580 time.Sleep(time.Millisecond)
581 }
582
583 stats := db.Stat()
584 assert.EqualValues(t, 0, stats.TotalConns())
585 assert.EqualValues(t, 1, stats.MaxIdleDestroyCount())
586 assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
587 assert.EqualValues(t, 1, stats.NewConnsCount())
588 }
589
590 func TestPoolBackgroundChecksMinConns(t *testing.T) {
591 t.Parallel()
592
593 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
594 defer cancel()
595
596 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
597 require.NoError(t, err)
598
599 config.HealthCheckPeriod = 100 * time.Millisecond
600 config.MinConns = 2
601
602 db, err := pgxpool.NewWithConfig(ctx, config)
603 require.NoError(t, err)
604 defer db.Close()
605
606 stats := db.Stat()
607 for !(stats.IdleConns() == 2 && stats.MaxLifetimeDestroyCount() == 0 && stats.NewConnsCount() == 2) && ctx.Err() == nil {
608 time.Sleep(50 * time.Millisecond)
609 stats = db.Stat()
610 }
611 require.EqualValues(t, 2, stats.IdleConns())
612 require.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
613 require.EqualValues(t, 2, stats.NewConnsCount())
614
615 c, err := db.Acquire(ctx)
616 require.NoError(t, err)
617
618 stats = db.Stat()
619 require.EqualValues(t, 1, stats.IdleConns())
620 require.EqualValues(t, 0, stats.MaxLifetimeDestroyCount())
621 require.EqualValues(t, 2, stats.NewConnsCount())
622
623 err = c.Conn().Close(ctx)
624 require.NoError(t, err)
625 c.Release()
626
627 stats = db.Stat()
628 for !(stats.IdleConns() == 2 && stats.MaxIdleDestroyCount() == 0 && stats.NewConnsCount() == 3) && ctx.Err() == nil {
629 time.Sleep(50 * time.Millisecond)
630 stats = db.Stat()
631 }
632 require.EqualValues(t, 2, stats.TotalConns())
633 require.EqualValues(t, 0, stats.MaxIdleDestroyCount())
634 require.EqualValues(t, 3, stats.NewConnsCount())
635 }
636
637 func TestPoolExec(t *testing.T) {
638 t.Parallel()
639
640 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
641 defer cancel()
642
643 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
644 require.NoError(t, err)
645 defer pool.Close()
646
647 testExec(t, ctx, pool)
648 }
649
650 func TestPoolQuery(t *testing.T) {
651 t.Parallel()
652
653 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
654 defer cancel()
655
656 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
657 require.NoError(t, err)
658 defer pool.Close()
659
660
661 testQuery(t, ctx, pool)
662 waitForReleaseToComplete()
663
664
665 rows, err := pool.Query(ctx, "select generate_series(1,$1)", 10)
666 require.NoError(t, err)
667
668 stats := pool.Stat()
669 assert.EqualValues(t, 1, stats.AcquiredConns())
670 assert.EqualValues(t, 1, stats.TotalConns())
671
672 rows.Close()
673 assert.NoError(t, rows.Err())
674 waitForReleaseToComplete()
675
676 stats = pool.Stat()
677 assert.EqualValues(t, 0, stats.AcquiredConns())
678 assert.EqualValues(t, 1, stats.TotalConns())
679
680 }
681
682 func TestPoolQueryRow(t *testing.T) {
683 t.Parallel()
684
685 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
686 defer cancel()
687
688 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
689 require.NoError(t, err)
690 defer pool.Close()
691
692 testQueryRow(t, ctx, pool)
693 waitForReleaseToComplete()
694
695 stats := pool.Stat()
696 assert.EqualValues(t, 0, stats.AcquiredConns())
697 assert.EqualValues(t, 1, stats.TotalConns())
698 }
699
700
701 func TestPoolQueryRowErrNoRows(t *testing.T) {
702 t.Parallel()
703
704 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
705 defer cancel()
706
707 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
708 require.NoError(t, err)
709 defer pool.Close()
710
711 err = pool.QueryRow(ctx, "select n from generate_series(1,10) n where n=0").Scan(nil)
712 require.Equal(t, pgx.ErrNoRows, err)
713 }
714
715
716 func TestPoolQueryRowScanPanicReleasesConnection(t *testing.T) {
717 t.Parallel()
718
719 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
720 defer cancel()
721
722 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
723 require.NoError(t, err)
724 defer pool.Close()
725
726 require.Panics(t, func() {
727 var greeting *string
728 pool.QueryRow(ctx, "select 'Hello, world!'").Scan(greeting)
729 })
730
731
732 }
733
734 func TestPoolSendBatch(t *testing.T) {
735 t.Parallel()
736
737 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
738 defer cancel()
739
740 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
741 require.NoError(t, err)
742 defer pool.Close()
743
744 testSendBatch(t, ctx, pool)
745 waitForReleaseToComplete()
746
747 stats := pool.Stat()
748 assert.EqualValues(t, 0, stats.AcquiredConns())
749 assert.EqualValues(t, 1, stats.TotalConns())
750 }
751
752 func TestPoolCopyFrom(t *testing.T) {
753
754
755 t.Parallel()
756
757 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
758 defer cancel()
759
760 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
761 require.NoError(t, err)
762 defer pool.Close()
763
764 _, err = pool.Exec(ctx, `drop table if exists poolcopyfromtest`)
765 require.NoError(t, err)
766
767 _, err = pool.Exec(ctx, `create table poolcopyfromtest(a int2, b int4, c int8, d varchar, e text, f date, g timestamptz)`)
768 require.NoError(t, err)
769 defer pool.Exec(ctx, `drop table poolcopyfromtest`)
770
771 tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
772
773 inputRows := [][]any{
774 {int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
775 {nil, nil, nil, nil, nil, nil, nil},
776 }
777
778 copyCount, err := pool.CopyFrom(ctx, pgx.Identifier{"poolcopyfromtest"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
779 assert.NoError(t, err)
780 assert.EqualValues(t, len(inputRows), copyCount)
781
782 rows, err := pool.Query(ctx, "select * from poolcopyfromtest")
783 assert.NoError(t, err)
784
785 var outputRows [][]any
786 for rows.Next() {
787 row, err := rows.Values()
788 if err != nil {
789 t.Errorf("Unexpected error for rows.Values(): %v", err)
790 }
791 outputRows = append(outputRows, row)
792 }
793
794 assert.NoError(t, rows.Err())
795 assert.Equal(t, inputRows, outputRows)
796 }
797
798 func TestConnReleaseClosesConnInFailedTransaction(t *testing.T) {
799 t.Parallel()
800
801 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
802 defer cancel()
803
804 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
805 require.NoError(t, err)
806 defer pool.Close()
807
808 c, err := pool.Acquire(ctx)
809 require.NoError(t, err)
810
811 pid := c.Conn().PgConn().PID()
812
813 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
814
815 _, err = c.Exec(ctx, "begin")
816 assert.NoError(t, err)
817
818 assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
819
820 _, err = c.Exec(ctx, "selct")
821 assert.Error(t, err)
822
823 assert.Equal(t, byte('E'), c.Conn().PgConn().TxStatus())
824
825 c.Release()
826 waitForReleaseToComplete()
827
828 c, err = pool.Acquire(ctx)
829 require.NoError(t, err)
830
831 assert.NotEqual(t, pid, c.Conn().PgConn().PID())
832 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
833
834 c.Release()
835 }
836
837 func TestConnReleaseClosesConnInTransaction(t *testing.T) {
838 t.Parallel()
839
840 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
841 defer cancel()
842
843 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
844 require.NoError(t, err)
845 defer pool.Close()
846
847 c, err := pool.Acquire(ctx)
848 require.NoError(t, err)
849
850 pid := c.Conn().PgConn().PID()
851
852 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
853
854 _, err = c.Exec(ctx, "begin")
855 assert.NoError(t, err)
856
857 assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus())
858
859 c.Release()
860 waitForReleaseToComplete()
861
862 c, err = pool.Acquire(ctx)
863 require.NoError(t, err)
864
865 assert.NotEqual(t, pid, c.Conn().PgConn().PID())
866 assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus())
867
868 c.Release()
869 }
870
871 func TestConnReleaseDestroysClosedConn(t *testing.T) {
872 t.Parallel()
873
874 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
875 defer cancel()
876
877 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
878 require.NoError(t, err)
879 defer pool.Close()
880
881 c, err := pool.Acquire(ctx)
882 require.NoError(t, err)
883
884 err = c.Conn().Close(ctx)
885 require.NoError(t, err)
886
887 assert.EqualValues(t, 1, pool.Stat().TotalConns())
888
889 c.Release()
890 waitForReleaseToComplete()
891
892
893 for i := 0; i < 1000; i++ {
894 if pool.Stat().TotalConns() == 0 {
895 break
896 }
897 time.Sleep(time.Millisecond)
898 }
899
900 assert.EqualValues(t, 0, pool.Stat().TotalConns())
901 }
902
903 func TestConnPoolQueryConcurrentLoad(t *testing.T) {
904 t.Parallel()
905
906 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
907 defer cancel()
908
909 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
910 require.NoError(t, err)
911 defer pool.Close()
912
913 n := 100
914 done := make(chan bool)
915
916 for i := 0; i < n; i++ {
917 go func() {
918 defer func() { done <- true }()
919 testQuery(t, ctx, pool)
920 testQueryRow(t, ctx, pool)
921 }()
922 }
923
924 for i := 0; i < n; i++ {
925 <-done
926 }
927 }
928
929 func TestConnReleaseWhenBeginFail(t *testing.T) {
930 t.Parallel()
931
932 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
933 defer cancel()
934
935 db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
936 require.NoError(t, err)
937 defer db.Close()
938
939 tx, err := db.BeginTx(ctx, pgx.TxOptions{
940 IsoLevel: pgx.TxIsoLevel("foo"),
941 })
942 assert.Error(t, err)
943 if !assert.Zero(t, tx) {
944 err := tx.Rollback(ctx)
945 assert.NoError(t, err)
946 }
947
948 for i := 0; i < 1000; i++ {
949 if db.Stat().TotalConns() == 0 {
950 break
951 }
952 time.Sleep(time.Millisecond)
953 }
954
955 assert.EqualValues(t, 0, db.Stat().TotalConns())
956 }
957
958 func TestTxBeginFuncNestedTransactionCommit(t *testing.T) {
959 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
960 defer cancel()
961
962 db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
963 require.NoError(t, err)
964 defer db.Close()
965
966 createSql := `
967 drop table if exists pgxpooltx;
968 create temporary table pgxpooltx(
969 id integer,
970 unique (id)
971 );
972 `
973
974 _, err = db.Exec(ctx, createSql)
975 require.NoError(t, err)
976
977 defer func() {
978 db.Exec(ctx, "drop table pgxpooltx")
979 }()
980
981 err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
982 _, err := db.Exec(ctx, "insert into pgxpooltx(id) values (1)")
983 require.NoError(t, err)
984
985 err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
986 _, err := db.Exec(ctx, "insert into pgxpooltx(id) values (2)")
987 require.NoError(t, err)
988
989 err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
990 _, err := db.Exec(ctx, "insert into pgxpooltx(id) values (3)")
991 require.NoError(t, err)
992 return nil
993 })
994 require.NoError(t, err)
995 return nil
996 })
997 require.NoError(t, err)
998 return nil
999 })
1000 require.NoError(t, err)
1001
1002 var n int64
1003 err = db.QueryRow(ctx, "select count(*) from pgxpooltx").Scan(&n)
1004 require.NoError(t, err)
1005 require.EqualValues(t, 3, n)
1006 }
1007
1008 func TestTxBeginFuncNestedTransactionRollback(t *testing.T) {
1009 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1010 defer cancel()
1011
1012 db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
1013 require.NoError(t, err)
1014 defer db.Close()
1015
1016 createSql := `
1017 drop table if exists pgxpooltx;
1018 create temporary table pgxpooltx(
1019 id integer,
1020 unique (id)
1021 );
1022 `
1023
1024 _, err = db.Exec(ctx, createSql)
1025 require.NoError(t, err)
1026
1027 defer func() {
1028 db.Exec(ctx, "drop table pgxpooltx")
1029 }()
1030
1031 err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
1032 _, err := db.Exec(ctx, "insert into pgxpooltx(id) values (1)")
1033 require.NoError(t, err)
1034
1035 err = pgx.BeginFunc(ctx, db, func(db pgx.Tx) error {
1036 _, err := db.Exec(ctx, "insert into pgxpooltx(id) values (2)")
1037 require.NoError(t, err)
1038 return errors.New("do a rollback")
1039 })
1040 require.EqualError(t, err, "do a rollback")
1041
1042 _, err = db.Exec(ctx, "insert into pgxpooltx(id) values (3)")
1043 require.NoError(t, err)
1044
1045 return nil
1046 })
1047 require.NoError(t, err)
1048
1049 var n int64
1050 err = db.QueryRow(ctx, "select count(*) from pgxpooltx").Scan(&n)
1051 require.NoError(t, err)
1052 require.EqualValues(t, 2, n)
1053 }
1054
1055 func TestIdempotentPoolClose(t *testing.T) {
1056 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1057 defer cancel()
1058
1059 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
1060 require.NoError(t, err)
1061
1062
1063 require.NotPanics(t, func() { pool.Close() })
1064
1065
1066 require.NotPanics(t, func() { pool.Close() })
1067 }
1068
1069 func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
1070 t.Parallel()
1071
1072 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1073 defer cancel()
1074
1075 config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
1076 require.NoError(t, err)
1077
1078 config.MinConns = int32(12)
1079 config.MaxConns = int32(15)
1080
1081 acquireAttempts := int64(0)
1082 connectAttempts := int64(0)
1083
1084 config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
1085 atomic.AddInt64(&acquireAttempts, 1)
1086 return true
1087 }
1088 config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
1089 atomic.AddInt64(&connectAttempts, 1)
1090 return nil
1091 }
1092
1093 pool, err := pgxpool.NewWithConfig(ctx, config)
1094 require.NoError(t, err)
1095 defer pool.Close()
1096
1097 for i := 0; i < 500; i++ {
1098 time.Sleep(10 * time.Millisecond)
1099
1100 stat := pool.Stat()
1101 if stat.IdleConns() == 12 && stat.AcquireCount() == 0 && stat.TotalConns() == 12 && atomic.LoadInt64(&acquireAttempts) == 0 && atomic.LoadInt64(&connectAttempts) == 12 {
1102 return
1103 }
1104 }
1105
1106 t.Fatal("did not reach min pool size")
1107
1108 }
1109
1110 func TestPoolSendBatchBatchCloseTwice(t *testing.T) {
1111 t.Parallel()
1112
1113 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1114 defer cancel()
1115
1116 pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
1117 require.NoError(t, err)
1118 defer pool.Close()
1119
1120 errChan := make(chan error)
1121 testCount := 5000
1122
1123 for i := 0; i < testCount; i++ {
1124 go func() {
1125 batch := &pgx.Batch{}
1126 batch.Queue("select 1")
1127 batch.Queue("select 2")
1128
1129 br := pool.SendBatch(ctx, batch)
1130 defer br.Close()
1131
1132 var err error
1133 var n int32
1134 err = br.QueryRow().Scan(&n)
1135 if err != nil {
1136 errChan <- err
1137 return
1138 }
1139 if n != 1 {
1140 errChan <- fmt.Errorf("expected 1 got %v", n)
1141 return
1142 }
1143
1144 err = br.QueryRow().Scan(&n)
1145 if err != nil {
1146 errChan <- err
1147 return
1148 }
1149 if n != 2 {
1150 errChan <- fmt.Errorf("expected 2 got %v", n)
1151 return
1152 }
1153
1154 err = br.Close()
1155 errChan <- err
1156 }()
1157 }
1158
1159 for i := 0; i < testCount; i++ {
1160 err := <-errChan
1161 assert.NoError(t, err)
1162 }
1163 }
1164
View as plain text