1 package pgxpool
2
3 import (
4 "context"
5 "fmt"
6 "math/rand"
7 "runtime"
8 "strconv"
9 "sync"
10 "sync/atomic"
11 "time"
12
13 "github.com/jackc/pgconn"
14 "github.com/jackc/pgx/v4"
15 "github.com/jackc/puddle"
16 )
17
18 var defaultMaxConns = int32(4)
19 var defaultMinConns = int32(0)
20 var defaultMaxConnLifetime = time.Hour
21 var defaultMaxConnIdleTime = time.Minute * 30
22 var defaultHealthCheckPeriod = time.Minute
23
24 type connResource struct {
25 conn *pgx.Conn
26 conns []Conn
27 poolRows []poolRow
28 poolRowss []poolRows
29 }
30
31 func (cr *connResource) getConn(p *Pool, res *puddle.Resource) *Conn {
32 if len(cr.conns) == 0 {
33 cr.conns = make([]Conn, 128)
34 }
35
36 c := &cr.conns[len(cr.conns)-1]
37 cr.conns = cr.conns[0 : len(cr.conns)-1]
38
39 c.res = res
40 c.p = p
41
42 return c
43 }
44
45 func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
46 if len(cr.poolRows) == 0 {
47 cr.poolRows = make([]poolRow, 128)
48 }
49
50 pr := &cr.poolRows[len(cr.poolRows)-1]
51 cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
52
53 pr.c = c
54 pr.r = r
55
56 return pr
57 }
58
59 func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
60 if len(cr.poolRowss) == 0 {
61 cr.poolRowss = make([]poolRows, 128)
62 }
63
64 pr := &cr.poolRowss[len(cr.poolRowss)-1]
65 cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
66
67 pr.c = c
68 pr.r = r
69
70 return pr
71 }
72
73
74
75 type detachedCtx struct {
76 context.Context
77 }
78
79 func (detachedCtx) Done() <-chan struct{} { return nil }
80 func (detachedCtx) Deadline() (time.Time, bool) { return time.Time{}, false }
81 func (detachedCtx) Err() error { return nil }
82
83
84 type Pool struct {
85
86
87 newConnsCount int64
88 lifetimeDestroyCount int64
89 idleDestroyCount int64
90
91 p *puddle.Pool
92 config *Config
93 beforeConnect func(context.Context, *pgx.ConnConfig) error
94 afterConnect func(context.Context, *pgx.Conn) error
95 beforeAcquire func(context.Context, *pgx.Conn) bool
96 afterRelease func(*pgx.Conn) bool
97 minConns int32
98 maxConns int32
99 maxConnLifetime time.Duration
100 maxConnLifetimeJitter time.Duration
101 maxConnIdleTime time.Duration
102 healthCheckPeriod time.Duration
103 healthCheckChan chan struct{}
104
105 closeOnce sync.Once
106 closeChan chan struct{}
107 }
108
109
110
111 type Config struct {
112 ConnConfig *pgx.ConnConfig
113
114
115
116 BeforeConnect func(context.Context, *pgx.ConnConfig) error
117
118
119 AfterConnect func(context.Context, *pgx.Conn) error
120
121
122
123
124 BeforeAcquire func(context.Context, *pgx.Conn) bool
125
126
127
128 AfterRelease func(*pgx.Conn) bool
129
130
131 MaxConnLifetime time.Duration
132
133
134
135 MaxConnLifetimeJitter time.Duration
136
137
138 MaxConnIdleTime time.Duration
139
140
141 MaxConns int32
142
143
144
145
146 MinConns int32
147
148
149 HealthCheckPeriod time.Duration
150
151
152
153
154 LazyConnect bool
155
156 createdByParseConfig bool
157 }
158
159
160
161
162 func (c *Config) Copy() *Config {
163 newConfig := new(Config)
164 *newConfig = *c
165 newConfig.ConnConfig = c.ConnConfig.Copy()
166 return newConfig
167 }
168
169
170 func (c *Config) ConnString() string { return c.ConnConfig.ConnString() }
171
172
173
174 func Connect(ctx context.Context, connString string) (*Pool, error) {
175 config, err := ParseConfig(connString)
176 if err != nil {
177 return nil, err
178 }
179
180 return ConnectConfig(ctx, config)
181 }
182
183
184
185 func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
186
187
188 if !config.createdByParseConfig {
189 panic("config must be created by ParseConfig")
190 }
191
192 p := &Pool{
193 config: config,
194 beforeConnect: config.BeforeConnect,
195 afterConnect: config.AfterConnect,
196 beforeAcquire: config.BeforeAcquire,
197 afterRelease: config.AfterRelease,
198 minConns: config.MinConns,
199 maxConns: config.MaxConns,
200 maxConnLifetime: config.MaxConnLifetime,
201 maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
202 maxConnIdleTime: config.MaxConnIdleTime,
203 healthCheckPeriod: config.HealthCheckPeriod,
204 healthCheckChan: make(chan struct{}, 1),
205 closeChan: make(chan struct{}),
206 }
207
208 p.p = puddle.NewPool(
209 func(ctx context.Context) (interface{}, error) {
210
211
212
213
214
215
216
217 ctx = detachedCtx{ctx}
218
219 connConfig := p.config.ConnConfig.Copy()
220
221
222 if connConfig.ConnectTimeout <= 0 {
223 connConfig.ConnectTimeout = 2 * time.Minute
224 }
225
226 if p.beforeConnect != nil {
227 if err := p.beforeConnect(ctx, connConfig); err != nil {
228 return nil, err
229 }
230 }
231
232 conn, err := pgx.ConnectConfig(ctx, connConfig)
233 if err != nil {
234 return nil, err
235 }
236
237 if p.afterConnect != nil {
238 err = p.afterConnect(ctx, conn)
239 if err != nil {
240 conn.Close(ctx)
241 return nil, err
242 }
243 }
244
245 cr := &connResource{
246 conn: conn,
247 conns: make([]Conn, 64),
248 poolRows: make([]poolRow, 64),
249 poolRowss: make([]poolRows, 64),
250 }
251
252 return cr, nil
253 },
254 func(value interface{}) {
255 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
256 conn := value.(*connResource).conn
257 conn.Close(ctx)
258 select {
259 case <-conn.PgConn().CleanupDone():
260 case <-ctx.Done():
261 }
262 cancel()
263 },
264 config.MaxConns,
265 )
266
267 if !config.LazyConnect {
268 if err := p.checkMinConnsWithContext(ctx); err != nil {
269
270 p.Close()
271 return nil, err
272 }
273
274
275 res, err := p.p.Acquire(ctx)
276 if err != nil {
277 p.Close()
278 return nil, err
279 }
280 res.Release()
281 }
282
283 go p.backgroundHealthCheck()
284
285 return p, nil
286 }
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305 func ParseConfig(connString string) (*Config, error) {
306 connConfig, err := pgx.ParseConfig(connString)
307 if err != nil {
308 return nil, err
309 }
310
311 config := &Config{
312 ConnConfig: connConfig,
313 createdByParseConfig: true,
314 }
315
316 if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
317 delete(connConfig.Config.RuntimeParams, "pool_max_conns")
318 n, err := strconv.ParseInt(s, 10, 32)
319 if err != nil {
320 return nil, fmt.Errorf("cannot parse pool_max_conns: %w", err)
321 }
322 if n < 1 {
323 return nil, fmt.Errorf("pool_max_conns too small: %d", n)
324 }
325 config.MaxConns = int32(n)
326 } else {
327 config.MaxConns = defaultMaxConns
328 if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns {
329 config.MaxConns = numCPU
330 }
331 }
332
333 if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok {
334 delete(connConfig.Config.RuntimeParams, "pool_min_conns")
335 n, err := strconv.ParseInt(s, 10, 32)
336 if err != nil {
337 return nil, fmt.Errorf("cannot parse pool_min_conns: %w", err)
338 }
339 config.MinConns = int32(n)
340 } else {
341 config.MinConns = defaultMinConns
342 }
343
344 if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
345 delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
346 d, err := time.ParseDuration(s)
347 if err != nil {
348 return nil, fmt.Errorf("invalid pool_max_conn_lifetime: %w", err)
349 }
350 config.MaxConnLifetime = d
351 } else {
352 config.MaxConnLifetime = defaultMaxConnLifetime
353 }
354
355 if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_idle_time"]; ok {
356 delete(connConfig.Config.RuntimeParams, "pool_max_conn_idle_time")
357 d, err := time.ParseDuration(s)
358 if err != nil {
359 return nil, fmt.Errorf("invalid pool_max_conn_idle_time: %w", err)
360 }
361 config.MaxConnIdleTime = d
362 } else {
363 config.MaxConnIdleTime = defaultMaxConnIdleTime
364 }
365
366 if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok {
367 delete(connConfig.Config.RuntimeParams, "pool_health_check_period")
368 d, err := time.ParseDuration(s)
369 if err != nil {
370 return nil, fmt.Errorf("invalid pool_health_check_period: %w", err)
371 }
372 config.HealthCheckPeriod = d
373 } else {
374 config.HealthCheckPeriod = defaultHealthCheckPeriod
375 }
376
377 if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
378 delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
379 d, err := time.ParseDuration(s)
380 if err != nil {
381 return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
382 }
383 config.MaxConnLifetimeJitter = d
384 }
385
386 return config, nil
387 }
388
389
390
391 func (p *Pool) Close() {
392 p.closeOnce.Do(func() {
393 close(p.closeChan)
394 p.p.Close()
395 })
396 }
397
398 func (p *Pool) isExpired(res *puddle.Resource) bool {
399 now := time.Now()
400
401
402 if now.Sub(res.CreationTime()) > p.maxConnLifetime+p.maxConnLifetimeJitter {
403 return true
404 }
405 if p.maxConnLifetimeJitter == 0 {
406 return false
407 }
408 jitterSecs := rand.Float64() * p.maxConnLifetimeJitter.Seconds()
409 return now.Sub(res.CreationTime()) > p.maxConnLifetime+(time.Duration(jitterSecs)*time.Second)
410 }
411
412 func (p *Pool) triggerHealthCheck() {
413 go func() {
414
415
416 time.Sleep(500 * time.Millisecond)
417 select {
418 case p.healthCheckChan <- struct{}{}:
419 default:
420 }
421 }()
422 }
423
424 func (p *Pool) backgroundHealthCheck() {
425 ticker := time.NewTicker(p.healthCheckPeriod)
426 defer ticker.Stop()
427 for {
428 select {
429 case <-p.closeChan:
430 return
431 case <-p.healthCheckChan:
432 p.checkHealth()
433 case <-ticker.C:
434 p.checkHealth()
435 }
436 }
437 }
438
439 func (p *Pool) checkHealth() {
440 for {
441
442
443 if err := p.checkMinConns(); err != nil {
444
445 break
446 }
447 if !p.checkConnsHealth() {
448
449 break
450 }
451
452
453 select {
454 case <-p.closeChan:
455 return
456 case <-time.After(500 * time.Millisecond):
457 }
458 }
459 }
460
461
462
463 func (p *Pool) checkConnsHealth() bool {
464 var destroyed bool
465 totalConns := p.Stat().TotalConns()
466 resources := p.p.AcquireAllIdle()
467 for _, res := range resources {
468
469 if p.isExpired(res) && totalConns >= p.minConns {
470 atomic.AddInt64(&p.lifetimeDestroyCount, 1)
471 res.Destroy()
472 destroyed = true
473
474 totalConns--
475 } else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
476 atomic.AddInt64(&p.idleDestroyCount, 1)
477 res.Destroy()
478 destroyed = true
479
480 totalConns--
481 } else {
482 res.ReleaseUnused()
483 }
484 }
485 return destroyed
486 }
487
488 func (p *Pool) checkMinConnsWithContext(ctx context.Context) error {
489
490
491
492 toCreate := p.minConns - p.Stat().TotalConns()
493 if toCreate > 0 {
494 return p.createIdleResources(ctx, int(toCreate))
495 }
496 return nil
497 }
498
499 func (p *Pool) checkMinConns() error {
500 return p.checkMinConnsWithContext(context.Background())
501 }
502
503 func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
504 ctx, cancel := context.WithCancel(parentCtx)
505 defer cancel()
506
507 errs := make(chan error, targetResources)
508
509 for i := 0; i < targetResources; i++ {
510 go func() {
511 atomic.AddInt64(&p.newConnsCount, 1)
512 err := p.p.CreateResource(ctx)
513 errs <- err
514 }()
515 }
516
517 var firstError error
518 for i := 0; i < targetResources; i++ {
519 err := <-errs
520 if err != nil && firstError == nil {
521 cancel()
522 firstError = err
523 }
524 }
525
526 return firstError
527 }
528
529
530 func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
531 for {
532 res, err := p.p.Acquire(ctx)
533 if err != nil {
534 return nil, err
535 }
536
537 cr := res.Value().(*connResource)
538 if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
539 return cr.getConn(p, res), nil
540 }
541
542 res.Destroy()
543 }
544 }
545
546
547
548
549 func (p *Pool) AcquireFunc(ctx context.Context, f func(*Conn) error) error {
550 conn, err := p.Acquire(ctx)
551 if err != nil {
552 return err
553 }
554 defer conn.Release()
555
556 return f(conn)
557 }
558
559
560
561 func (p *Pool) AcquireAllIdle(ctx context.Context) []*Conn {
562 resources := p.p.AcquireAllIdle()
563 conns := make([]*Conn, 0, len(resources))
564 for _, res := range resources {
565 cr := res.Value().(*connResource)
566 if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
567 conns = append(conns, cr.getConn(p, res))
568 } else {
569 res.Destroy()
570 }
571 }
572
573 return conns
574 }
575
576
577 func (p *Pool) Config() *Config { return p.config.Copy() }
578
579
580 func (p *Pool) Stat() *Stat {
581 return &Stat{
582 s: p.p.Stat(),
583 newConnsCount: atomic.LoadInt64(&p.newConnsCount),
584 lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
585 idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount),
586 }
587 }
588
589
590
591
592
593 func (p *Pool) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
594 c, err := p.Acquire(ctx)
595 if err != nil {
596 return nil, err
597 }
598 defer c.Release()
599
600 return c.Exec(ctx, sql, arguments...)
601 }
602
603
604
605
606
607
608
609
610
611
612
613 func (p *Pool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
614 c, err := p.Acquire(ctx)
615 if err != nil {
616 return errRows{err: err}, err
617 }
618
619 rows, err := c.Query(ctx, sql, args...)
620 if err != nil {
621 c.Release()
622 return errRows{err: err}, err
623 }
624
625 return c.getPoolRows(rows), nil
626 }
627
628
629
630
631
632
633
634
635
636
637
638
639
640 func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
641 c, err := p.Acquire(ctx)
642 if err != nil {
643 return errRow{err: err}
644 }
645
646 row := c.QueryRow(ctx, sql, args...)
647 return c.getPoolRow(row)
648 }
649
650 func (p *Pool) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
651 c, err := p.Acquire(ctx)
652 if err != nil {
653 return nil, err
654 }
655 defer c.Release()
656
657 return c.QueryFunc(ctx, sql, args, scans, f)
658 }
659
660 func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
661 c, err := p.Acquire(ctx)
662 if err != nil {
663 return errBatchResults{err: err}
664 }
665
666 br := c.SendBatch(ctx, b)
667 return &poolBatchResults{br: br, c: c}
668 }
669
670
671
672
673
674 func (p *Pool) Begin(ctx context.Context) (pgx.Tx, error) {
675 return p.BeginTx(ctx, pgx.TxOptions{})
676 }
677
678
679
680
681
682 func (p *Pool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
683 c, err := p.Acquire(ctx)
684 if err != nil {
685 return nil, err
686 }
687
688 t, err := c.BeginTx(ctx, txOptions)
689 if err != nil {
690 c.Release()
691 return nil, err
692 }
693
694 return &Tx{t: t, c: c}, nil
695 }
696
697 func (p *Pool) BeginFunc(ctx context.Context, f func(pgx.Tx) error) error {
698 return p.BeginTxFunc(ctx, pgx.TxOptions{}, f)
699 }
700
701 func (p *Pool) BeginTxFunc(ctx context.Context, txOptions pgx.TxOptions, f func(pgx.Tx) error) error {
702 c, err := p.Acquire(ctx)
703 if err != nil {
704 return err
705 }
706 defer c.Release()
707
708 return c.BeginTxFunc(ctx, txOptions, f)
709 }
710
711 func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
712 c, err := p.Acquire(ctx)
713 if err != nil {
714 return 0, err
715 }
716 defer c.Release()
717
718 return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
719 }
720
721
722
723 func (p *Pool) Ping(ctx context.Context) error {
724 c, err := p.Acquire(ctx)
725 if err != nil {
726 return err
727 }
728 defer c.Release()
729 return c.Ping(ctx)
730 }
731
View as plain text