1 package pgxpool
2
3 import (
4 "context"
5 "sync/atomic"
6
7 "github.com/jackc/pgconn"
8 "github.com/jackc/pgx/v4"
9 "github.com/jackc/puddle"
10 )
11
12
13 type Conn struct {
14 res *puddle.Resource
15 p *Pool
16 }
17
18
19
20 func (c *Conn) Release() {
21 if c.res == nil {
22 return
23 }
24
25 conn := c.Conn()
26 res := c.res
27 c.res = nil
28
29 if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' {
30 res.Destroy()
31
32
33 c.p.triggerHealthCheck()
34 return
35 }
36
37
38
39
40 if c.p.isExpired(res) {
41 atomic.AddInt64(&c.p.lifetimeDestroyCount, 1)
42 res.Destroy()
43
44
45 c.p.triggerHealthCheck()
46 return
47 }
48
49 if c.p.afterRelease == nil {
50 res.Release()
51 return
52 }
53
54 go func() {
55 if c.p.afterRelease(conn) {
56 res.Release()
57 } else {
58 res.Destroy()
59
60
61 c.p.triggerHealthCheck()
62 }
63 }()
64 }
65
66
67
68 func (c *Conn) Hijack() *pgx.Conn {
69 if c.res == nil {
70 panic("cannot hijack already released or hijacked connection")
71 }
72
73 conn := c.Conn()
74 res := c.res
75 c.res = nil
76
77 res.Hijack()
78
79 return conn
80 }
81
82 func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
83 return c.Conn().Exec(ctx, sql, arguments...)
84 }
85
86 func (c *Conn) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
87 return c.Conn().Query(ctx, sql, args...)
88 }
89
90 func (c *Conn) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
91 return c.Conn().QueryRow(ctx, sql, args...)
92 }
93
94 func (c *Conn) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
95 return c.Conn().QueryFunc(ctx, sql, args, scans, f)
96 }
97
98 func (c *Conn) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
99 return c.Conn().SendBatch(ctx, b)
100 }
101
102 func (c *Conn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
103 return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
104 }
105
106
107 func (c *Conn) Begin(ctx context.Context) (pgx.Tx, error) {
108 return c.Conn().Begin(ctx)
109 }
110
111
112 func (c *Conn) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
113 return c.Conn().BeginTx(ctx, txOptions)
114 }
115
116 func (c *Conn) BeginFunc(ctx context.Context, f func(pgx.Tx) error) error {
117 return c.Conn().BeginFunc(ctx, f)
118 }
119
120 func (c *Conn) BeginTxFunc(ctx context.Context, txOptions pgx.TxOptions, f func(pgx.Tx) error) error {
121 return c.Conn().BeginTxFunc(ctx, txOptions, f)
122 }
123
124 func (c *Conn) Ping(ctx context.Context) error {
125 return c.Conn().Ping(ctx)
126 }
127
128 func (c *Conn) Conn() *pgx.Conn {
129 return c.connResource().conn
130 }
131
132 func (c *Conn) connResource() *connResource {
133 return c.res.Value().(*connResource)
134 }
135
136 func (c *Conn) getPoolRow(r pgx.Row) *poolRow {
137 return c.connResource().getPoolRow(c, r)
138 }
139
140 func (c *Conn) getPoolRows(r pgx.Rows) *poolRows {
141 return c.connResource().getPoolRows(c, r)
142 }
143
View as plain text