...
1 package postgres
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7
8 "github.com/go-logr/logr"
9 "github.com/jackc/pgconn"
10 "github.com/jackc/pgx/v4"
11 "github.com/jackc/pgx/v4/pgxpool"
12 )
13
14 type defaultClientV4 struct {
15 ctx context.Context
16 logger logr.Logger
17
18 pool *pgxpool.Pool
19 dialFunc pgconn.DialFunc
20 }
21
22
23 var _ Client = &defaultClient{}
24
25 type defaultRowsV4 struct {
26 pgx.Rows
27 }
28
29
30 var _ Rows = &defaultRowsV4{}
31
32 func (r *defaultRowsV4) Close() error {
33 r.Rows.Close()
34 return nil
35 }
36
37
38
39
40
41
42 func NewWithV4Dialer(ctx context.Context, logger logr.Logger, opts DSNOptions, dialFunc pgconn.DialFunc) (Client, error) {
43 l := logger.WithValues("sql", "postgres")
44
45 dsn := opts.ToString()
46 config, err := pgxpool.ParseConfig(dsn)
47 if err != nil {
48 return nil, err
49 }
50 l.Info("using dsn config", "connString", redactPW(config.ConnString()))
51
52 config.ConnConfig.DialFunc = dialFunc
53
54 pool, err := pgxpool.ConnectConfig(ctx, config)
55 if err != nil {
56 return nil, fmt.Errorf("sql.Open: %v", err)
57 }
58
59 return &defaultClientV4{
60 ctx: ctx,
61 logger: l,
62 pool: pool,
63 dialFunc: dialFunc,
64 }, nil
65 }
66
67
68 func (c *defaultClientV4) Connect() error {
69 return nil
70 }
71
72
73
74 func (c *defaultClientV4) Close() error {
75 c.pool.Close()
76 return nil
77 }
78
79
80 func (c *defaultClientV4) IsConnected() bool {
81 if c.pool == nil {
82 return false
83 }
84 return c.pool.Ping(c.ctx) == nil
85 }
86
87
88 func (c *defaultClientV4) Insert(statement string, args ...interface{}) error {
89 s := strings.TrimSpace(strings.ToLower(statement))
90 if !strings.HasPrefix(s, commandInsert) {
91 return fmt.Errorf("invalid sql command")
92 }
93 if _, err := c.pool.Exec(c.ctx, statement, args...); err != nil {
94 return err
95 }
96 return nil
97 }
98
99
100 func (c *defaultClientV4) Query(statement string, args ...interface{}) (Rows, error) {
101 s := strings.TrimSpace(strings.ToLower(statement))
102 if !strings.HasPrefix(s, commandSelect) {
103 return nil, fmt.Errorf("invalid sql command")
104 }
105 rows, err := c.pool.Query(c.ctx, statement, args...)
106 if err != nil {
107 return nil, err
108 }
109 return &defaultRowsV4{rows}, nil
110 }
111
View as plain text