1 package server
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "cloud.google.com/go/pubsub"
11 guuid "github.com/google/uuid"
12
13 "edge-infra.dev/pkg/edge/psqlinjector/metrics"
14 "edge-infra.dev/pkg/f8n/kinform/model"
15 "edge-infra.dev/pkg/lib/fog"
16 )
17
18
19
20
21
22
23 var ErrBadMessage = fmt.Errorf("bad message")
24
25
26
27
28 var ErrIgnoredMessage = fmt.Errorf("ignored message")
29
30
31 var zeroUUID = (guuid.UUID{}).String()
32
33 func (k *PSQLInjector) handleMsg(ctx context.Context, msg *pubsub.Message) error {
34 switch msg.Attributes[model.AttrPayloadType] {
35
36 case model.PayloadTypeClusterHeartbeat:
37 var heartbeat model.ClusterHeartbeat
38 if err := json.Unmarshal(msg.Data, &heartbeat); err != nil {
39 return BadMessageErrorf("failed to unmarshal cluster heartbeat pubsub message: %w", err)
40 }
41
42 return k.handleClusterHeartbeat(ctx, heartbeat)
43
44 case model.PayloadTypeWatchedEvent:
45 return k.handleEvent(ctx, msg)
46
47 case model.PayloadTypeScrapeMessage:
48 var sm model.ScrapeMessage
49 if err := json.Unmarshal(msg.Data, &sm); err != nil {
50 return BadMessageErrorf("failed to unmarshal scraped objects pubsub message: %w", err)
51 }
52 return k.handleScrapeMessage(ctx, sm)
53
54 case model.PayloadTypeWatchedField:
55 var wf model.WatchedField
56 if err := json.Unmarshal(msg.Data, &wf); err != nil {
57 return BadMessageErrorf("error unmarshaling model.WatchedField: %w", err)
58 }
59 return k.handleWatchedField(ctx, wf)
60 }
61
62
63 return IgnoredMessageErrorf("payload types should be validated before reaching this function")
64 }
65
66
67 func (k *PSQLInjector) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
68 var successful = new(bool)
69 var pt = msg.Attributes[model.AttrPayloadType]
70 var ceid = msg.Attributes[model.AttrClusterUUID]
71
72
73 defer metrics.IncMessagesTotal(pt, ceid, *successful)
74
75 var log = fog.New().WithValues(
76 "payload_type", pt,
77 "cluster_uuid", ceid,
78 )
79 ctx = fog.IntoContext(ctx, log)
80
81
82 var badMessageLog = log.WithValues("attributes", msg.Attributes, "data", msg.Data)
83
84
87
88 switch pt {
89
90 case model.PayloadTypeClusterHeartbeat:
91 case model.PayloadTypeWatchedEvent:
92 case model.PayloadTypeWatchedField:
93 case model.PayloadTypeScrapeMessage:
94
95 case "":
96 metrics.IncBadMessagesTotal(pt, ceid)
97 badMessageLog.Error(ErrBadMessage, "missing payload type attribute")
98 return nil
99
100 default:
101
102 metrics.IncIgnoredMessagesTotal(pt, ceid)
103 log.Info("ignoring unsupported payload type")
104 *successful = true
105 return nil
106 }
107
108 if ceid == "" {
109 metrics.IncBadMessagesTotal(pt, ceid)
110 badMessageLog.Error(ErrBadMessage, "message is missing the cluster uuid attribute")
111 return nil
112 } else if ceid == zeroUUID {
113 metrics.IncBadMessagesTotal(pt, ceid)
114 badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid is the zero uuid")
115 return nil
116 } else if _, err := guuid.Parse(ceid); err != nil {
117 metrics.IncBadMessagesTotal(pt, ceid)
118 badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid not parsable", "parse_error", err)
119 return nil
120 }
121
122
123
124
125 if parsedCeid, err := parseClusterInJSON(msg.Data); err != nil {
126 metrics.IncBadMessagesTotal(pt, ceid)
127 badMessageLog.Error(ErrBadMessage, "failed to parse json in message data", "parse_error", err)
128 return nil
129 } else if parsedCeid != ceid {
130 metrics.IncBadMessagesTotal(pt, ceid)
131 badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid does not match message data for Cluster field", "parsed_cluster_uuid", parsedCeid)
132 return nil
133 }
134
135
140 err := k.handleMsg(ctx, msg)
141
142 if err != nil {
143 switch {
144 case errors.Is(err, ErrBadMessage):
145 metrics.IncBadMessagesTotal(pt, ceid)
146 badMessageLog.Error(err, "acking bad message")
147 return nil
148 case errors.Is(err, ErrIgnoredMessage):
149 metrics.IncIgnoredMessagesTotal(pt, ceid)
150 log.Info("ignoring message", "reason", err)
151 *successful = true
152 return nil
153 default:
154
155 metrics.IncRequeueErrorsTotal(pt, ceid)
156 log.Error(err, "failed to handle message")
157 return err
158 }
159 }
160 *successful = true
161 return nil
162 }
163
164 func (k *PSQLInjector) handleScrapeMessage(ctx context.Context, sm model.ScrapeMessage) error {
165 var log = fog.FromContext(ctx).WithValues(
166 "start_time", sm.StartTime,
167 "done_time", sm.DoneTime,
168 )
169
170 if sm.StartTime.IsZero() {
171
172 return BadMessageErrorf("scrape message start time is zero")
173 }
174
175 select {
176 case <-time.After(k.cfg.DelayScrapeMessageProcessing):
177
178 case <-ctx.Done():
179 return ctx.Err()
180 }
181
182 err := k.sql.DeleteOutdatedWatchedFieldObjects(ctx, sm)
183 if err != nil {
184 log.Error(err, "failed to delete outdated watched field objects")
185 return err
186 }
187
188 log.Info("successfully handled scraped objects")
189 return nil
190 }
191
192
193 func (k *PSQLInjector) handleClusterHeartbeat(ctx context.Context, heartbeat model.ClusterHeartbeat) error {
194 var log = fog.FromContext(ctx).WithValues(
195 "cluster_version_major", heartbeat.ClusterVersion.Major,
196 "cluster_version_minor", heartbeat.ClusterVersion.Minor,
197 "session_id", heartbeat.SessionID,
198 )
199
200 err := k.sql.SetClusterHeartbeatTime(ctx, heartbeat.Timestamp, heartbeat.Cluster)
201 if err != nil {
202 log.Error(err, "failed to set cluster heartbeat in database")
203 return err
204 }
205
206 log.Info("successfully updated cluster heartbeat")
207 return nil
208 }
209
210
211 func (k *PSQLInjector) handleEvent(_ context.Context, _ *pubsub.Message) error {
212 return IgnoredMessageErrorf("not implemented yet")
213 }
214
215 func (k *PSQLInjector) handleWatchedField(ctx context.Context, wf model.WatchedField) error {
216 var log = fog.FromContext(ctx).WithValues("watched_field", wf)
217 if err := wf.Validate(); err != nil {
218 return BadMessageErrorf("invalid model.WatchedField: %w", err)
219 }
220
221 switch wf.Event {
222 case model.ResourceAdd, model.ResourceUpdate:
223 err := k.sql.SetWatchedField(ctx, wf)
224 if err != nil {
225 log.Error(err, "failed to set watched field in database")
226 return err
227 }
228 case model.ResourceDelete:
229 err := k.sql.DeleteWatchedField(ctx, wf)
230 if err != nil {
231 log.Error(err, "failed to delete watched field in database")
232 return err
233 }
234 default:
235 return IgnoredMessageErrorf("unknown event in watched field: %q", wf.Event)
236 }
237
238 log.Info("successfully handled watched field")
239 return nil
240 }
241
242
243 func parseClusterInJSON(data []byte) (string, error) {
244
245
246 var common struct {
247 Cluster *guuid.UUID `json:",omitempty"`
248 }
249 if err := json.Unmarshal(data, &common); err != nil {
250 return "", BadMessageErrorf("message json is invalid: %w", err)
251 } else if common.Cluster == nil {
252 return "", BadMessageErrorf("missing the required '.Cluster' field in json data")
253 }
254 return common.Cluster.String(), nil
255 }
256
257
258 func BadMessageErrorf(msg string, values ...any) error {
259 return joinErr{
260 parent: ErrBadMessage,
261 child: fmt.Errorf(msg, values...),
262 }
263 }
264
265
266 func IgnoredMessageErrorf(msg string, values ...any) error {
267 return joinErr{
268 parent: ErrIgnoredMessage,
269 child: fmt.Errorf(msg, values...),
270 }
271 }
272
273 type joinErr struct {
274 parent error
275 child error
276 }
277
278
279 func (err joinErr) Error() string {
280 return err.child.Error()
281 }
282
283
284 func (err joinErr) Unwrap() []error {
285 return []error{err.parent, err.child}
286 }
287
View as plain text