...

Source file src/edge-infra.dev/pkg/lib/db/postgres/default_clientv4.go

Documentation: edge-infra.dev/pkg/lib/db/postgres

     1  package postgres
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  
     8  	"github.com/go-logr/logr"
     9  	"github.com/jackc/pgconn"
    10  	"github.com/jackc/pgx/v4"
    11  	"github.com/jackc/pgx/v4/pgxpool"
    12  )
    13  
    14  type defaultClientV4 struct {
    15  	ctx    context.Context
    16  	logger logr.Logger
    17  
    18  	pool     *pgxpool.Pool
    19  	dialFunc pgconn.DialFunc
    20  }
    21  
    22  // assert interface implementation
    23  var _ Client = &defaultClient{}
    24  
    25  type defaultRowsV4 struct {
    26  	pgx.Rows
    27  }
    28  
    29  // assert interface implementation
    30  var _ Rows = &defaultRowsV4{}
    31  
    32  func (r *defaultRowsV4) Close() error {
    33  	r.Rows.Close()
    34  	return nil
    35  }
    36  
    37  // TODO(help_wanted) - look into better solution that supporting multiple versions of pgx. Potential
    38  // option is https://github.com/GoogleCloudPlatform/cloud-sql-go-connector/compare/main...pgxv5
    39  
    40  // NewWithV4Dialer creates a PostgreSQL client that can work with connectors. This should only be
    41  // used for CloudSQL until support for pgx/v5 is supported.
    42  func NewWithV4Dialer(ctx context.Context, logger logr.Logger, opts DSNOptions, dialFunc pgconn.DialFunc) (Client, error) {
    43  	l := logger.WithValues("sql", "postgres")
    44  
    45  	dsn := opts.ToString()
    46  	config, err := pgxpool.ParseConfig(dsn)
    47  	if err != nil {
    48  		return nil, err
    49  	}
    50  	l.Info("using dsn config", "connString", redactPW(config.ConnString()))
    51  
    52  	config.ConnConfig.DialFunc = dialFunc
    53  
    54  	pool, err := pgxpool.ConnectConfig(ctx, config)
    55  	if err != nil {
    56  		return nil, fmt.Errorf("sql.Open: %v", err)
    57  	}
    58  
    59  	return &defaultClientV4{
    60  		ctx:      ctx,
    61  		logger:   l,
    62  		pool:     pool,
    63  		dialFunc: dialFunc,
    64  	}, nil
    65  }
    66  
    67  // Connect is a no-op
    68  func (c *defaultClientV4) Connect() error {
    69  	return nil
    70  }
    71  
    72  // Close closes all connections in the pool and rejects future Acquire calls. Blocks until all
    73  // connections are returned to pool and closed
    74  func (c *defaultClientV4) Close() error {
    75  	c.pool.Close()
    76  	return nil
    77  }
    78  
    79  // IsConnected checks to see if connection is in a healthy state
    80  func (c *defaultClientV4) IsConnected() bool {
    81  	if c.pool == nil {
    82  		return false
    83  	}
    84  	return c.pool.Ping(c.ctx) == nil
    85  }
    86  
    87  // Insert ...
    88  func (c *defaultClientV4) Insert(statement string, args ...interface{}) error {
    89  	s := strings.TrimSpace(strings.ToLower(statement))
    90  	if !strings.HasPrefix(s, commandInsert) {
    91  		return fmt.Errorf("invalid sql command")
    92  	}
    93  	if _, err := c.pool.Exec(c.ctx, statement, args...); err != nil {
    94  		return err
    95  	}
    96  	return nil
    97  }
    98  
    99  // Query ...
   100  func (c *defaultClientV4) Query(statement string, args ...interface{}) (Rows, error) {
   101  	s := strings.TrimSpace(strings.ToLower(statement))
   102  	if !strings.HasPrefix(s, commandSelect) {
   103  		return nil, fmt.Errorf("invalid sql command")
   104  	}
   105  	rows, err := c.pool.Query(c.ctx, statement, args...)
   106  	if err != nil {
   107  		return nil, err
   108  	}
   109  	return &defaultRowsV4{rows}, nil
   110  }
   111  

View as plain text