package postgres import ( "context" "fmt" "net/url" "regexp" "strings" "github.com/go-logr/logr" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" ) const ( commandInsert = "insert" commandSelect = "select" ) type defaultClient struct { ctx context.Context logger logr.Logger pool *pgxpool.Pool dialFunc pgconn.DialFunc } // assert interface implementation var _ Client = &defaultClient{} type defaultRows struct { pgx.Rows } // assert interface implementation var _ Rows = &defaultRows{} func (r *defaultRows) Close() error { r.Rows.Close() return nil } // TODO(help_wanted) - look into better solution for ensuring SQL command wrappers validate // intended usage // New creates a generic PostgreSQL client. Should be used when no connector is needed. func New(ctx context.Context, logger logr.Logger, opts DSNOptions) (Client, error) { l := logger.WithValues("sql", "postgres") dsn := opts.ToString() config, err := pgxpool.ParseConfig(dsn) if err != nil { return nil, err } l.Info("using dsn config", "connString", redactPW(config.ConnString())) pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { return nil, err } return &defaultClient{ ctx: ctx, logger: l, pool: pool, }, nil } // NewWithDialer creates a PostgreSQL client that can work with connectors // **NOTE:** cloud-sql-go-connector and alloydb-go-connector do not support this version yet // https://github.com/GoogleCloudPlatform/cloud-sql-go-connector/issues/335 func NewWithDialer(ctx context.Context, logger logr.Logger, opts DSNOptions, dialFunc pgconn.DialFunc) (Client, error) { l := logger.WithValues("sql", "postgres") dsn := opts.ToString() config, err := pgxpool.ParseConfig(dsn) if err != nil { return nil, err } l.Info("using dsn config", "connString", redactPW(config.ConnString())) // config.ConnConfig.Config.DialFunc config.ConnConfig.DialFunc = dialFunc pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { return nil, err } return &defaultClient{ ctx: ctx, logger: l, pool: pool, dialFunc: dialFunc, }, nil } // Connect is a no-op so just run Ping to test func (c *defaultClient) Connect() error { return c.pool.Ping(c.ctx) } // Close closes all connections in the pool and rejects future Acquire calls. Blocks until all // connections are returned to pool and closed func (c *defaultClient) Close() error { c.pool.Close() return nil } // IsConnected checks to see if connection is in a healthy state func (c *defaultClient) IsConnected() bool { if c.pool == nil { return false } return c.pool.Ping(c.ctx) == nil } // Insert ... func (c *defaultClient) Insert(statement string, args ...interface{}) error { s := strings.TrimSpace(strings.ToLower(statement)) if !strings.HasPrefix(s, commandInsert) { return fmt.Errorf("invalid sql command") } if _, err := c.pool.Exec(c.ctx, statement, args...); err != nil { return err } return nil } // Query ... func (c *defaultClient) Query(statement string, args ...interface{}) (Rows, error) { s := strings.TrimSpace(strings.ToLower(statement)) if !strings.HasPrefix(s, commandSelect) { return nil, fmt.Errorf("invalid sql command") } rows, err := c.pool.Query(c.ctx, statement, args...) if err != nil { return nil, err } return &defaultRows{rows}, nil } func redactPW(connString string) string { if strings.HasPrefix(connString, "postgres://") || strings.HasPrefix(connString, "postgresql://") { if u, err := url.Parse(connString); err == nil { return redactURL(u) } } quotedDSN := regexp.MustCompile(`password='[^']*'`) connString = quotedDSN.ReplaceAllLiteralString(connString, "password=xxxxx") plainDSN := regexp.MustCompile(`password=[^ ]*`) connString = plainDSN.ReplaceAllLiteralString(connString, "password=xxxxx") brokenURL := regexp.MustCompile(`:[^:@]+?@`) connString = brokenURL.ReplaceAllLiteralString(connString, ":xxxxxx@") return connString } func redactURL(u *url.URL) string { if u == nil { return "" } if _, pwSet := u.User.Password(); pwSet { u.User = url.UserPassword(u.User.Username(), "xxxxx") } return u.String() }