1 package lighthouse
2
3 import (
4 "context"
5 "sync"
6 "time"
7
8 "github.com/go-logr/logr"
9 "github.com/lib/pq"
10
11 "edge-infra.dev/pkg/edge/lighthouse/event"
12 "edge-infra.dev/pkg/lib/gcp/pubsub"
13 )
14
15 const (
16
17
18 minReconnectInterval = time.Second * 5
19
20
21 maxReconnectInterval = time.Minute * 5
22
23
24 DefaultLivelinessProbe = time.Minute * 20
25
26
27 DefaultMaxConcurrent = 1000
28 )
29
30
31 type WatchTower struct {
32 sync.Mutex
33
34 channels map[string]struct{}
35
36 listener *pq.Listener
37
38 livelinessProbe time.Duration
39
40 maxConcurrent int64
41
42 projectID string
43
44 pubsubClient pubsub.Publisher
45
46 topicID string
47
48 ticker *time.Ticker
49 }
50
51
52 type NotificationEvent struct {
53 ID string
54 Channel string
55 Operation string
56 Record interface{}
57 Acknowledged bool
58 }
59
60
61 func NewWatchTower() *WatchTower {
62 return &WatchTower{
63 channels: make(map[string]struct{}),
64 livelinessProbe: DefaultLivelinessProbe,
65 maxConcurrent: DefaultMaxConcurrent,
66 ticker: time.NewTicker(DefaultLivelinessProbe),
67 }
68 }
69
70
71 func (w *WatchTower) SetLivelinessProbe(liveliness time.Duration) *WatchTower {
72 w.livelinessProbe = liveliness
73 return w
74 }
75
76
77 func (w *WatchTower) GetLivelinessProbe() time.Duration {
78 return w.livelinessProbe
79 }
80
81
82 func (w *WatchTower) SetMaxConcurrent(max int64) *WatchTower {
83 w.maxConcurrent = max
84 return w
85 }
86
87
88 func (w *WatchTower) GetMaxConcurrent() int64 {
89 return w.maxConcurrent
90 }
91
92
93 func (w *WatchTower) RegisterChannel(channel string) error {
94 w.Mutex.Lock()
95 defer w.Mutex.Unlock()
96 if _, exists := w.channels[channel]; !exists {
97 if err := w.listener.Listen(channel); err != nil {
98 return err
99 }
100 w.channels[channel] = struct{}{}
101 }
102 return nil
103 }
104
105
106 func (w *WatchTower) ChannelExists(channel string) bool {
107 w.Mutex.Lock()
108 defer w.Mutex.Unlock()
109 _, exists := w.channels[channel]
110 return exists
111 }
112
113
114
115 func (w *WatchTower) UnregisterChannel(channel string) error {
116 w.Mutex.Lock()
117 defer w.Mutex.Unlock()
118 if _, exists := w.channels[channel]; exists {
119 if err := w.listener.Unlisten(channel); err != nil {
120 return err
121 }
122 delete(w.channels, channel)
123 }
124 return nil
125 }
126
127
128
129 func (w *WatchTower) UnregisterAllChannels() error {
130 w.Mutex.Lock()
131 defer w.Mutex.Unlock()
132 if err := w.listener.UnlistenAll(); err != nil {
133 return err
134 }
135 w.channels = make(map[string]struct{})
136 return nil
137 }
138
139
140 func (w *WatchTower) AddListener(dbcfg pq.Dialer, connectionString string, minInterval, maxInterval time.Duration, log logr.Logger) *WatchTower {
141 if minInterval == 0 {
142 minInterval = minReconnectInterval
143 }
144 if maxInterval == 0 {
145 maxInterval = maxReconnectInterval
146 }
147 dbListener := pq.NewDialListener(dbcfg, connectionString, minInterval, maxInterval, func(_ pq.ListenerEventType, err error) {
148 if err != nil {
149 log.Error(err, "An error occurred creating new dialer listener")
150 }
151 })
152 w.listener = dbListener
153 return w
154 }
155
156
157 func (w *WatchTower) SetProjectID(projectID string) *WatchTower {
158 w.projectID = projectID
159 return w
160 }
161
162
163 func (w *WatchTower) GetProjectID() string {
164 return w.projectID
165 }
166
167
168 func (w *WatchTower) SetTopicID(topicID string) *WatchTower {
169 w.topicID = topicID
170 return w
171 }
172
173
174 func (w *WatchTower) GetTopicID() string {
175 return w.topicID
176 }
177
178
179 func (w *WatchTower) AddPubsubClient(pubsub pubsub.Publisher) *WatchTower {
180 w.pubsubClient = pubsub
181 return w
182 }
183
184
185 func (w *WatchTower) Stream(ctx context.Context, log logr.Logger) {
186 wg := &sync.WaitGroup{}
187 semaphore := make(chan struct{}, w.maxConcurrent)
188 defer close(semaphore)
189 for {
190 select {
191 case <-ctx.Done():
192 return
193 case n := <-w.listener.Notify:
194 wg.Add(1)
195 go func(n *pq.Notification, log logr.Logger) {
196 defer wg.Done()
197 semaphore <- struct{}{}
198 defer func() { <-semaphore }()
199 if n == nil || n.Extra == "" {
200 log.Info("ignoring empty message", "channel", n.Channel)
201 return
202 }
203 ev, err := event.New().Unmarshal(n.Channel, n.Extra)
204 if err != nil {
205 log.Error(err, "An error occurred unmarshaling postgres event")
206 return
207 }
208 payload, err := ev.ToByte()
209 if err != nil {
210 log.Error(err, "An error occurred converting postgres event to byte")
211 return
212 }
213 bannerEdgeID, ok := ev.BannerEdgeID.(string)
214 if !ok {
215 bannerEdgeID = ""
216 }
217 ClusterInfraclusterEdgeID, ok := ev.ClusterInfraClusterEdgeID.(string)
218 if !ok {
219 ClusterInfraclusterEdgeID = ""
220 }
221 if err := w.pubsubClient.Send(ctx, w.topicID, payload, map[string]string{
222 "banner_edge_id": bannerEdgeID,
223 "cluster_infra_cluster_edge_id": ClusterInfraclusterEdgeID,
224 }); err != nil {
225 log.Error(err, "An error occurred publishing postgres event to pubsub")
226 return
227 }
228 log.Info("successfully published postgres event to pubsub", "operation", ev.Operation, "postgres_channel", n.Channel, "pubsub_topic", w.topicID)
229 }(n, log)
230 wg.Wait()
231 case <-time.After(w.livelinessProbe):
232 log.Info("no messages received within the liveliness probe limit. checking the postgres notification connection", "liveliness_probe_duration_in_minutes", w.livelinessProbe/60)
233 go func(log logr.Logger) {
234 switch err := w.listener.Ping(); err == nil {
235 case true:
236 log.Info("connection ping was successful, postgres notification connection is healthy")
237 default:
238 log.Error(err, "An error occurred pinging the postgres notification connection")
239 }
240 }(log)
241 }
242 }
243 }
244
View as plain text