package lighthouse import ( "context" "sync" "time" "github.com/go-logr/logr" "github.com/lib/pq" "edge-infra.dev/pkg/edge/lighthouse/event" "edge-infra.dev/pkg/lib/gcp/pubsub" ) const ( // minReconnectInterval is the minimum default // reconnect interval for the PG listener. minReconnectInterval = time.Second * 5 // maxReconnectInterval is the maximum default // reconnect interval for the PG listener. maxReconnectInterval = time.Minute * 5 // DefaultLivelinessProbe is the default time to wait // for a message before pinging the connection for liveliness. DefaultLivelinessProbe = time.Minute * 20 // DefaultMaxConcurrent the maximum number of goroutines // that can be spawn by the daemon to process the PG notify events. DefaultMaxConcurrent = 1000 ) // WatchTower represents listener for PG notifications and events. type WatchTower struct { sync.Mutex // channels the channels to listen on for PG events. channels map[string]struct{} // listener an interface for listening to events from PG. listener *pq.Listener // livelinessProbe the wait time for a message before the connection is pinged for liveliness. livelinessProbe time.Duration // maxConcurrent the maximum number of goroutines the can be spawn to handle PG events. maxConcurrent int64 // projectID the gcp project projectID string // pubsubClient the gcp pubsub client pubsubClient pubsub.Publisher // topicID the gcp pubsub topic to send the pubsub messages. topicID string // ticker is the wait time before querying the database for unacknowledged events. ticker *time.Ticker } // NotificationEvent represents the PG notification stored in the database. type NotificationEvent struct { ID string Channel string Operation string Record interface{} Acknowledged bool } // NewWatchTower returns a new listener. func NewWatchTower() *WatchTower { return &WatchTower{ channels: make(map[string]struct{}), livelinessProbe: DefaultLivelinessProbe, maxConcurrent: DefaultMaxConcurrent, ticker: time.NewTicker(DefaultLivelinessProbe), } } // SetLivelinessProbe sets the listener livelinessprobe duration value. func (w *WatchTower) SetLivelinessProbe(liveliness time.Duration) *WatchTower { w.livelinessProbe = liveliness return w } // GetLivelinessProbe gets the listener livelinessprobe duration value. func (w *WatchTower) GetLivelinessProbe() time.Duration { return w.livelinessProbe } // SetMaxConcurrent sets the listener maximum concurrent goroutine bounds. func (w *WatchTower) SetMaxConcurrent(max int64) *WatchTower { w.maxConcurrent = max return w } // GetMaxConcurrent gets the listener maximum concurrent goroutine bounds. func (w *WatchTower) GetMaxConcurrent() int64 { return w.maxConcurrent } // RegisterChannel registers a new channel to be watched for PG events. func (w *WatchTower) RegisterChannel(channel string) error { w.Mutex.Lock() defer w.Mutex.Unlock() if _, exists := w.channels[channel]; !exists { if err := w.listener.Listen(channel); err != nil { return err } w.channels[channel] = struct{}{} } return nil } // ChannelExists checks if a channel exists. func (w *WatchTower) ChannelExists(channel string) bool { w.Mutex.Lock() defer w.Mutex.Unlock() _, exists := w.channels[channel] return exists } // UnregisterChannel unregisters a channel from the listener if the channel exists // after this is called, no events will be received from that channel. func (w *WatchTower) UnregisterChannel(channel string) error { w.Mutex.Lock() defer w.Mutex.Unlock() if _, exists := w.channels[channel]; exists { if err := w.listener.Unlisten(channel); err != nil { return err } delete(w.channels, channel) } return nil } // UnregisterAllChannels unregisters all channels from the listener and no events // will be received after this is called until a new channel is registered. func (w *WatchTower) UnregisterAllChannels() error { w.Mutex.Lock() defer w.Mutex.Unlock() if err := w.listener.UnlistenAll(); err != nil { return err } w.channels = make(map[string]struct{}) return nil } // AddListener adds a new listener by initializing a gcp cloudsql dialer. func (w *WatchTower) AddListener(dbcfg pq.Dialer, connectionString string, minInterval, maxInterval time.Duration, log logr.Logger) *WatchTower { if minInterval == 0 { minInterval = minReconnectInterval } if maxInterval == 0 { maxInterval = maxReconnectInterval } dbListener := pq.NewDialListener(dbcfg, connectionString, minInterval, maxInterval, func(_ pq.ListenerEventType, err error) { if err != nil { log.Error(err, "An error occurred creating new dialer listener") } }) w.listener = dbListener return w } // SetProjectID sets the pubsub projectID. func (w *WatchTower) SetProjectID(projectID string) *WatchTower { w.projectID = projectID return w } // GetProjectID gets the pubsub projectID. func (w *WatchTower) GetProjectID() string { return w.projectID } // SetTopicID sets the pubsub topicID. func (w *WatchTower) SetTopicID(topicID string) *WatchTower { w.topicID = topicID return w } // GetTopicID gets the pubsub topicID. func (w *WatchTower) GetTopicID() string { return w.topicID } // AddPubsubClient sets the pubsub client. func (w *WatchTower) AddPubsubClient(pubsub pubsub.Publisher) *WatchTower { w.pubsubClient = pubsub return w } // Stream listens to the notification channel and spawns a new goroutine to handle each event. func (w *WatchTower) Stream(ctx context.Context, log logr.Logger) { wg := &sync.WaitGroup{} semaphore := make(chan struct{}, w.maxConcurrent) defer close(semaphore) for { select { case <-ctx.Done(): return case n := <-w.listener.Notify: wg.Add(1) go func(n *pq.Notification, log logr.Logger) { defer wg.Done() semaphore <- struct{}{} defer func() { <-semaphore }() if n == nil || n.Extra == "" { log.Info("ignoring empty message", "channel", n.Channel) return } ev, err := event.New().Unmarshal(n.Channel, n.Extra) if err != nil { log.Error(err, "An error occurred unmarshaling postgres event") return } payload, err := ev.ToByte() if err != nil { log.Error(err, "An error occurred converting postgres event to byte") return } bannerEdgeID, ok := ev.BannerEdgeID.(string) if !ok { bannerEdgeID = "" } ClusterInfraclusterEdgeID, ok := ev.ClusterInfraClusterEdgeID.(string) if !ok { ClusterInfraclusterEdgeID = "" } if err := w.pubsubClient.Send(ctx, w.topicID, payload, map[string]string{ "banner_edge_id": bannerEdgeID, "cluster_infra_cluster_edge_id": ClusterInfraclusterEdgeID, }); err != nil { log.Error(err, "An error occurred publishing postgres event to pubsub") return } log.Info("successfully published postgres event to pubsub", "operation", ev.Operation, "postgres_channel", n.Channel, "pubsub_topic", w.topicID) }(n, log) wg.Wait() case <-time.After(w.livelinessProbe): log.Info("no messages received within the liveliness probe limit. checking the postgres notification connection", "liveliness_probe_duration_in_minutes", w.livelinessProbe/60) go func(log logr.Logger) { switch err := w.listener.Ping(); err == nil { case true: log.Info("connection ping was successful, postgres notification connection is healthy") default: log.Error(err, "An error occurred pinging the postgres notification connection") } }(log) } } }