...

Package server

import "edge-infra.dev/pkg/edge/psqlinjector"
Overview
Index
Subdirectories

Overview ▾

Index ▾

Variables
func BadMessageErrorf(msg string, values ...any) error
func IgnoredMessageErrorf(msg string, values ...any) error
func TxSetWatchedFieldObject(tx *sql.Tx, wf model.WatchedField) (*uuid.UUID, error)
func TxSetWatchedFieldValues(tx *sql.Tx, wf model.WatchedField, objectID *uuid.UUID) error
type Config
    func NewConfig() (*Config, error)
    func (cfg *Config) ConnectToDB() (*sql.DB, error)
    func (cfg *Config) Validate() error
type DBHandle
    func (h *DBHandle) DeleteOutdatedWatchedFieldObjects(ctx context.Context, sm model.ScrapeMessage) error
    func (h *DBHandle) DeleteWatchedField(ctx context.Context, wf model.WatchedField) error
    func (h *DBHandle) GarbageCollectDeletedWatchedFieldObjects(ctx context.Context) (int, error)
    func (h *DBHandle) GetBannerProjectIDs(ctx context.Context) ([]string, error)
    func (h *DBHandle) SetClusterHeartbeatTime(ctx context.Context, t time.Time, clusterEdgeID uuid.UUID) error
    func (h *DBHandle) SetWatchedField(ctx context.Context, wf model.WatchedField) error
type PSQLInjector
    func New(cfg *Config) (*PSQLInjector, error)
    func (k *PSQLInjector) HandleMsg(ctx context.Context, msg *pubsub.Message) error
    func (p *PSQLInjector) Run(ctx context.Context) error
type Receiver
    func NewReceiver(projectID string, subscription *pubsub.Subscription, handler ReceiverHandler, pollPeriod time.Duration) *Receiver
    func (r *Receiver) Close() error
    func (r *Receiver) HealthCheck() ReceiverHealthCheck
    func (r *Receiver) Start(ctx context.Context)
type ReceiverHandler
type ReceiverHealthCheck
type ReceiverMux
    func NewReceiverMux(cfg *ReceiverMuxConfig) (*ReceiverMux, error)
    func (rm *ReceiverMux) HealthCheck() ReceiverMuxHealthCheck
    func (rm *ReceiverMux) Poll(ctx context.Context) (added, dropped []*Receiver, maxRetryError error)
    func (rm *ReceiverMux) Run(ctx context.Context) error
type ReceiverMuxConfig
type ReceiverMuxHealthCheck

Package files

config.go db.go ps_server.go receivermux.go receivers.go sql.go subscriber.go

Variables

ErrBadMessage is returned by functions so the pubsub message can be Acked and produce an error log.

For instance, json.Unmarshal errors should be joined by ErrBadMessage since since it can't be processed by the handler.

Functions should use BadMessageErrorf to provide the reason the message is bad.

var ErrBadMessage = fmt.Errorf("bad message")

ErrIgnoredMessage is returned by functions so the pubsub message can be Acked without producing an error log.

Functions should use IgnoredMessageErrorf to provide the reason the message is ignored.

var ErrIgnoredMessage = fmt.Errorf("ignored message")
var ErrPollMaxRetries = fmt.Errorf("maximum poll retries reached")

func BadMessageErrorf

func BadMessageErrorf(msg string, values ...any) error

BadMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrBadMessage)`

func IgnoredMessageErrorf

func IgnoredMessageErrorf(msg string, values ...any) error

IgnoredMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrIgnoredMessage)`

func TxSetWatchedFieldObject

func TxSetWatchedFieldObject(tx *sql.Tx, wf model.WatchedField) (*uuid.UUID, error)

func TxSetWatchedFieldValues

func TxSetWatchedFieldValues(tx *sql.Tx, wf model.WatchedField, objectID *uuid.UUID) error

type Config

type Config struct {
    MetricsAddr string
    HealthzAddr string

    ForemanProjectID string
    TopicID          string
    SubscriptionID   string

    DBConnection string
    DBName       string
    DBUsername   string
    DBPassword   string // for testing locally, not used when in the cloud.

    PollBannersPeriod     time.Duration
    PollBannersMaxRetries int

    PollSubscriptionExistsPeriod time.Duration

    // DelayScrapeMessageProcessing prevents deleting outdated watched field objects from thrashing the database
    // with unnecessary writes & deletes due to pubsub's out-of-order message delivery.
    //
    // Tests use a smaller value to prevent unnecessary delays.
    DelayScrapeMessageProcessing time.Duration

    BannerCheckPeriod time.Duration

    GarbageCollectDeletedWatchedFieldObjects bool

    DB             *sql.DB          // for testing, the flags for DB will be ignored if set.
    TestPubSubConn *grpc.ClientConn // for testing, needed since using manager.
}

func NewConfig

func NewConfig() (*Config, error)

func (*Config) ConnectToDB

func (cfg *Config) ConnectToDB() (*sql.DB, error)

func (*Config) Validate

func (cfg *Config) Validate() error

type DBHandle

type DBHandle struct {
    *sql.DB
}

func (*DBHandle) DeleteOutdatedWatchedFieldObjects

func (h *DBHandle) DeleteOutdatedWatchedFieldObjects(ctx context.Context, sm model.ScrapeMessage) error

func (*DBHandle) DeleteWatchedField

func (h *DBHandle) DeleteWatchedField(ctx context.Context, wf model.WatchedField) error

func (*DBHandle) GarbageCollectDeletedWatchedFieldObjects

func (h *DBHandle) GarbageCollectDeletedWatchedFieldObjects(ctx context.Context) (int, error)

GarbageCollectDeletedWatchedFieldObjects garbage collects watched field objects that have been deleted for over an hour. Since garbage collection requires a full table scan, it should only occur at startup when psqlinjector is not processing messages. Running this query before processing messages prevents deadlocks and performance hiccups in the database.

func (*DBHandle) GetBannerProjectIDs

func (h *DBHandle) GetBannerProjectIDs(ctx context.Context) ([]string, error)

func (*DBHandle) SetClusterHeartbeatTime

func (h *DBHandle) SetClusterHeartbeatTime(ctx context.Context, t time.Time, clusterEdgeID uuid.UUID) error

func (*DBHandle) SetWatchedField

func (h *DBHandle) SetWatchedField(ctx context.Context, wf model.WatchedField) error

SetWatchedField upserts the watched field and cleans up outdated values. If the timestamp is out of date, no data is updated.

type PSQLInjector

type PSQLInjector struct {
    // contains filtered or unexported fields
}

func New

func New(cfg *Config) (*PSQLInjector, error)

func (*PSQLInjector) HandleMsg

func (k *PSQLInjector) HandleMsg(ctx context.Context, msg *pubsub.Message) error

HandleMsg handles messages sent by the kinform client.

func (*PSQLInjector) Run

func (p *PSQLInjector) Run(ctx context.Context) error

Run starts psqlinjector.

This function returns nil when either the passed-in context was canceled, or the banners table changed.

type Receiver

type Receiver struct {
    // contains filtered or unexported fields
}

func NewReceiver

func NewReceiver(projectID string, subscription *pubsub.Subscription, handler ReceiverHandler, pollPeriod time.Duration) *Receiver

func (*Receiver) Close

func (r *Receiver) Close() error

Close is non-blocking and asynchronous. An error is returned if the Receiver hasn't been started.

func (*Receiver) HealthCheck

func (r *Receiver) HealthCheck() ReceiverHealthCheck

HealthCheck calculates up-to-date metrics for the receiver.

func (*Receiver) Start

func (r *Receiver) Start(ctx context.Context)

Start receives from the subscription. Start is self-healing and retries when errors occur. It only returns if the provided context is canceled or Close is called.

type ReceiverHandler

type ReceiverHandler func(context.Context, *pubsub.Message) error

type ReceiverHealthCheck

ReceiverHealthCheck provides metrics for each Receiver since it was added to the BannerMux.

NOTE: There can be a delay for the Receiver to report itself as unhealthy. This is due to the nature of polling.

type ReceiverHealthCheck struct {
    RestartCount           int
    TotalMessagesProcessed int
    TotalReceiveDuration   time.Duration
    TotalMessagesAcked     int
    TotalMessagesNacked    int

    Healthy                  bool // Healthy indicates the Receiver is currently receiving messages
    CurrentReceiveDuration   time.Duration
    CurrentMessagesProcessed int
    CurrentMessagesAcked     int
    CurrentMessagesNacked    int
}

type ReceiverMux

ReceiverMux is a pubsub.Subscription multiplexer.

It polls the PollFunc for projectIDs, dynamically subscribes to the configured pubsub.Subscription in that project, and routes every pubsub.Message to a common Handler function.

type ReceiverMux struct {
    sync.Mutex // protects "receivers"
    // contains filtered or unexported fields
}

func NewReceiverMux

func NewReceiverMux(cfg *ReceiverMuxConfig) (*ReceiverMux, error)

func (*ReceiverMux) HealthCheck

func (rm *ReceiverMux) HealthCheck() ReceiverMuxHealthCheck

func (*ReceiverMux) Poll

func (rm *ReceiverMux) Poll(ctx context.Context) (added, dropped []*Receiver, maxRetryError error)

func (*ReceiverMux) Run

func (rm *ReceiverMux) Run(ctx context.Context) error

type ReceiverMuxConfig

type ReceiverMuxConfig struct {
    // Handler receives messages from each of the multiplexed subscriptions
    Handler ReceiverHandler

    // PollFunc returns slice of GCP Project IDs
    PollFunc       func(context.Context) ([]string, error)
    PollPeriod     time.Duration
    PollMaxRetries int

    SubscriptionID string
    TopicID        string // TODO perhaps check this in the Receiver... or remove it since it's not used

    Conn             *grpc.ClientConn // optional grpc client for testing
    ForemanProjectID string           // needed to create a shared pubsub.Client

    // PollSubscriptionExistsPeriod is passed into Receivers to periodically check if their subscription still exists
    PollSubscriptionExistsPeriod time.Duration
}

type ReceiverMuxHealthCheck

TODO plan on converting the health check's fields into prometheus metrics.

type ReceiverMuxHealthCheck struct {
    Count          int
    HealthyCount   int
    UnhealthyCount int
    Receivers      map[string]ReceiverHealthCheck
}

Subdirectories

Name Synopsis
..
metrics