ErrBadMessage is returned by functions so the pubsub message can be Acked and produce an error log.
For instance, json.Unmarshal errors should be joined by ErrBadMessage since since it can't be processed by the handler.
Functions should use BadMessageErrorf to provide the reason the message is bad.
var ErrBadMessage = fmt.Errorf("bad message")
ErrIgnoredMessage is returned by functions so the pubsub message can be Acked without producing an error log.
Functions should use IgnoredMessageErrorf to provide the reason the message is ignored.
var ErrIgnoredMessage = fmt.Errorf("ignored message")
var ErrPollMaxRetries = fmt.Errorf("maximum poll retries reached")
func BadMessageErrorf(msg string, values ...any) error
BadMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrBadMessage)`
func IgnoredMessageErrorf(msg string, values ...any) error
IgnoredMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrIgnoredMessage)`
func TxSetWatchedFieldObject(tx *sql.Tx, wf model.WatchedField) (*uuid.UUID, error)
func TxSetWatchedFieldValues(tx *sql.Tx, wf model.WatchedField, objectID *uuid.UUID) error
type Config struct { MetricsAddr string HealthzAddr string ForemanProjectID string TopicID string SubscriptionID string DBConnection string DBName string DBUsername string DBPassword string // for testing locally, not used when in the cloud. PollBannersPeriod time.Duration PollBannersMaxRetries int PollSubscriptionExistsPeriod time.Duration // DelayScrapeMessageProcessing prevents deleting outdated watched field objects from thrashing the database // with unnecessary writes & deletes due to pubsub's out-of-order message delivery. // // Tests use a smaller value to prevent unnecessary delays. DelayScrapeMessageProcessing time.Duration BannerCheckPeriod time.Duration GarbageCollectDeletedWatchedFieldObjects bool DB *sql.DB // for testing, the flags for DB will be ignored if set. TestPubSubConn *grpc.ClientConn // for testing, needed since using manager. }
func NewConfig() (*Config, error)
func (cfg *Config) ConnectToDB() (*sql.DB, error)
func (cfg *Config) Validate() error
type DBHandle struct { *sql.DB }
func (h *DBHandle) DeleteOutdatedWatchedFieldObjects(ctx context.Context, sm model.ScrapeMessage) error
func (h *DBHandle) DeleteWatchedField(ctx context.Context, wf model.WatchedField) error
func (h *DBHandle) GarbageCollectDeletedWatchedFieldObjects(ctx context.Context) (int, error)
GarbageCollectDeletedWatchedFieldObjects garbage collects watched field objects that have been deleted for over an hour. Since garbage collection requires a full table scan, it should only occur at startup when psqlinjector is not processing messages. Running this query before processing messages prevents deadlocks and performance hiccups in the database.
func (h *DBHandle) GetBannerProjectIDs(ctx context.Context) ([]string, error)
func (h *DBHandle) SetClusterHeartbeatTime(ctx context.Context, t time.Time, clusterEdgeID uuid.UUID) error
func (h *DBHandle) SetWatchedField(ctx context.Context, wf model.WatchedField) error
SetWatchedField upserts the watched field and cleans up outdated values. If the timestamp is out of date, no data is updated.
type PSQLInjector struct {
// contains filtered or unexported fields
}
func New(cfg *Config) (*PSQLInjector, error)
func (k *PSQLInjector) HandleMsg(ctx context.Context, msg *pubsub.Message) error
HandleMsg handles messages sent by the kinform client.
func (p *PSQLInjector) Run(ctx context.Context) error
Run starts psqlinjector.
This function returns nil when either the passed-in context was canceled, or the banners table changed.
type Receiver struct {
// contains filtered or unexported fields
}
func NewReceiver(projectID string, subscription *pubsub.Subscription, handler ReceiverHandler, pollPeriod time.Duration) *Receiver
func (r *Receiver) Close() error
Close is non-blocking and asynchronous. An error is returned if the Receiver hasn't been started.
func (r *Receiver) HealthCheck() ReceiverHealthCheck
HealthCheck calculates up-to-date metrics for the receiver.
func (r *Receiver) Start(ctx context.Context)
Start receives from the subscription. Start is self-healing and retries when errors occur. It only returns if the provided context is canceled or Close is called.
type ReceiverHandler func(context.Context, *pubsub.Message) error
ReceiverHealthCheck provides metrics for each Receiver since it was added to the BannerMux.
NOTE: There can be a delay for the Receiver to report itself as unhealthy. This is due to the nature of polling.
type ReceiverHealthCheck struct { RestartCount int TotalMessagesProcessed int TotalReceiveDuration time.Duration TotalMessagesAcked int TotalMessagesNacked int Healthy bool // Healthy indicates the Receiver is currently receiving messages CurrentReceiveDuration time.Duration CurrentMessagesProcessed int CurrentMessagesAcked int CurrentMessagesNacked int }
ReceiverMux is a pubsub.Subscription multiplexer.
It polls the PollFunc for projectIDs, dynamically subscribes to the configured pubsub.Subscription in that project, and routes every pubsub.Message to a common Handler function.
type ReceiverMux struct { sync.Mutex // protects "receivers" // contains filtered or unexported fields }
func NewReceiverMux(cfg *ReceiverMuxConfig) (*ReceiverMux, error)
func (rm *ReceiverMux) HealthCheck() ReceiverMuxHealthCheck
func (rm *ReceiverMux) Poll(ctx context.Context) (added, dropped []*Receiver, maxRetryError error)
func (rm *ReceiverMux) Run(ctx context.Context) error
type ReceiverMuxConfig struct { // Handler receives messages from each of the multiplexed subscriptions Handler ReceiverHandler // PollFunc returns slice of GCP Project IDs PollFunc func(context.Context) ([]string, error) PollPeriod time.Duration PollMaxRetries int SubscriptionID string TopicID string // TODO perhaps check this in the Receiver... or remove it since it's not used Conn *grpc.ClientConn // optional grpc client for testing ForemanProjectID string // needed to create a shared pubsub.Client // PollSubscriptionExistsPeriod is passed into Receivers to periodically check if their subscription still exists PollSubscriptionExistsPeriod time.Duration }
TODO plan on converting the health check's fields into prometheus metrics.
type ReceiverMuxHealthCheck struct { Count int HealthyCount int UnhealthyCount int Receivers map[string]ReceiverHealthCheck }