package server import ( "context" "encoding/json" "errors" "fmt" "time" "cloud.google.com/go/pubsub" guuid "github.com/google/uuid" "edge-infra.dev/pkg/edge/psqlinjector/metrics" "edge-infra.dev/pkg/f8n/kinform/model" "edge-infra.dev/pkg/lib/fog" ) // ErrBadMessage is returned by functions so the pubsub message can be Acked and produce an error log. // // For instance, json.Unmarshal errors should be joined by ErrBadMessage since since it can't be processed by the handler. // // Functions should use BadMessageErrorf to provide the reason the message is bad. var ErrBadMessage = fmt.Errorf("bad message") // ErrIgnoredMessage is returned by functions so the pubsub message can be Acked without producing an error log. // // Functions should use IgnoredMessageErrorf to provide the reason the message is ignored. var ErrIgnoredMessage = fmt.Errorf("ignored message") // zeroUUID will be seen if kinform is misconfigured. var zeroUUID = (guuid.UUID{}).String() func (k *PSQLInjector) handleMsg(ctx context.Context, msg *pubsub.Message) error { switch msg.Attributes[model.AttrPayloadType] { // Heartbeat case model.PayloadTypeClusterHeartbeat: var heartbeat model.ClusterHeartbeat if err := json.Unmarshal(msg.Data, &heartbeat); err != nil { return BadMessageErrorf("failed to unmarshal cluster heartbeat pubsub message: %w", err) } return k.handleClusterHeartbeat(ctx, heartbeat) // Event case model.PayloadTypeWatchedEvent: return k.handleEvent(ctx, msg) // Scrape case model.PayloadTypeScrapeMessage: var sm model.ScrapeMessage if err := json.Unmarshal(msg.Data, &sm); err != nil { return BadMessageErrorf("failed to unmarshal scraped objects pubsub message: %w", err) } return k.handleScrapeMessage(ctx, sm) // Field case model.PayloadTypeWatchedField: var wf model.WatchedField if err := json.Unmarshal(msg.Data, &wf); err != nil { return BadMessageErrorf("error unmarshaling model.WatchedField: %w", err) } return k.handleWatchedField(ctx, wf) } // this should really be a panic, but that would be annoying. return IgnoredMessageErrorf("payload types should be validated before reaching this function") } // HandleMsg handles messages sent by the kinform client. func (k *PSQLInjector) HandleMsg(ctx context.Context, msg *pubsub.Message) error { var successful = new(bool) var pt = msg.Attributes[model.AttrPayloadType] var ceid = msg.Attributes[model.AttrClusterUUID] // increase "messages_total" for every message received. defer metrics.IncMessagesTotal(pt, ceid, *successful) var log = fog.New().WithValues( "payload_type", pt, "cluster_uuid", ceid, ) ctx = fog.IntoContext(ctx, log) // when we get a bad message we should also log the attributes and data for analysis var badMessageLog = log.WithValues("attributes", msg.Attributes, "data", msg.Data) /* Validate common message values that every kinform message contains. */ switch pt { // Supported case model.PayloadTypeClusterHeartbeat: case model.PayloadTypeWatchedEvent: case model.PayloadTypeWatchedField: case model.PayloadTypeScrapeMessage: // Missing case "": metrics.IncBadMessagesTotal(pt, ceid) badMessageLog.Error(ErrBadMessage, "missing payload type attribute") return nil // Ignored default: // ignored messages are expected, so its logged as info, and marked successful metrics.IncIgnoredMessagesTotal(pt, ceid) log.Info("ignoring unsupported payload type") *successful = true return nil } if ceid == "" { metrics.IncBadMessagesTotal(pt, ceid) badMessageLog.Error(ErrBadMessage, "message is missing the cluster uuid attribute") return nil } else if ceid == zeroUUID { metrics.IncBadMessagesTotal(pt, ceid) badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid is the zero uuid") return nil } else if _, err := guuid.Parse(ceid); err != nil { metrics.IncBadMessagesTotal(pt, ceid) badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid not parsable", "parse_error", err) return nil } // Every payload for psqlinjector contains the Cluster field. // Although it's redundant, it allows functions to only care about the msg.Data. // Check that the json is valid and the Cluster in the data matches the attribute. if parsedCeid, err := parseClusterInJSON(msg.Data); err != nil { metrics.IncBadMessagesTotal(pt, ceid) badMessageLog.Error(ErrBadMessage, "failed to parse json in message data", "parse_error", err) return nil } else if parsedCeid != ceid { metrics.IncBadMessagesTotal(pt, ceid) badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid does not match message data for Cluster field", "parsed_cluster_uuid", parsedCeid) return nil } /* Handle the payload. Then, based on the error, determine if the pubsub message should be acked or nacked by the subscriber. */ err := k.handleMsg(ctx, msg) if err != nil { switch { case errors.Is(err, ErrBadMessage): metrics.IncBadMessagesTotal(pt, ceid) badMessageLog.Error(err, "acking bad message") return nil case errors.Is(err, ErrIgnoredMessage): metrics.IncIgnoredMessagesTotal(pt, ceid) log.Info("ignoring message", "reason", err) *successful = true return nil default: // requeue messages that are valid, but failed due to intermittent outages. metrics.IncRequeueErrorsTotal(pt, ceid) log.Error(err, "failed to handle message") return err } } *successful = true return nil } func (k *PSQLInjector) handleScrapeMessage(ctx context.Context, sm model.ScrapeMessage) error { var log = fog.FromContext(ctx).WithValues( "start_time", sm.StartTime, "done_time", sm.DoneTime, ) if sm.StartTime.IsZero() { // sanity check return BadMessageErrorf("scrape message start time is zero") } select { case <-time.After(k.cfg.DelayScrapeMessageProcessing): // prevent database thrashing from out-of-order pubsub messages case <-ctx.Done(): return ctx.Err() } err := k.sql.DeleteOutdatedWatchedFieldObjects(ctx, sm) if err != nil { log.Error(err, "failed to delete outdated watched field objects") return err } log.Info("successfully handled scraped objects") return nil } // handleClusterHeartbeatMessage will update the status time in the db for the cluster func (k *PSQLInjector) handleClusterHeartbeat(ctx context.Context, heartbeat model.ClusterHeartbeat) error { var log = fog.FromContext(ctx).WithValues( "cluster_version_major", heartbeat.ClusterVersion.Major, "cluster_version_minor", heartbeat.ClusterVersion.Minor, "session_id", heartbeat.SessionID, ) err := k.sql.SetClusterHeartbeatTime(ctx, heartbeat.Timestamp, heartbeat.Cluster) if err != nil { log.Error(err, "failed to set cluster heartbeat in database") return err } log.Info("successfully updated cluster heartbeat") return nil } // handleEvent appends a new cluster event to the cluster_events table. func (k *PSQLInjector) handleEvent(_ context.Context, _ *pubsub.Message) error { return IgnoredMessageErrorf("not implemented yet") } func (k *PSQLInjector) handleWatchedField(ctx context.Context, wf model.WatchedField) error { var log = fog.FromContext(ctx).WithValues("watched_field", wf) if err := wf.Validate(); err != nil { return BadMessageErrorf("invalid model.WatchedField: %w", err) } switch wf.Event { case model.ResourceAdd, model.ResourceUpdate: err := k.sql.SetWatchedField(ctx, wf) if err != nil { log.Error(err, "failed to set watched field in database") return err } case model.ResourceDelete: err := k.sql.DeleteWatchedField(ctx, wf) if err != nil { log.Error(err, "failed to delete watched field in database") return err } default: return IgnoredMessageErrorf("unknown event in watched field: %q", wf.Event) } log.Info("successfully handled watched field") return nil } // parseClusterInJSON parses the Cluster field contained in msg.Data func parseClusterInJSON(data []byte) (string, error) { // The Cluster field in messages must equal the cluster uuid attribute. // This also determines if the msg.Data is valid JSON var common struct { Cluster *guuid.UUID `json:",omitempty"` } if err := json.Unmarshal(data, &common); err != nil { return "", BadMessageErrorf("message json is invalid: %w", err) } else if common.Cluster == nil { return "", BadMessageErrorf("missing the required '.Cluster' field in json data") } return common.Cluster.String(), nil } // BadMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrBadMessage)` func BadMessageErrorf(msg string, values ...any) error { return joinErr{ parent: ErrBadMessage, child: fmt.Errorf(msg, values...), } } // IgnoredMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrIgnoredMessage)` func IgnoredMessageErrorf(msg string, values ...any) error { return joinErr{ parent: ErrIgnoredMessage, child: fmt.Errorf(msg, values...), } } type joinErr struct { parent error child error } // Error only prints the child error since printing the parent is redundant in logs. func (err joinErr) Error() string { return err.child.Error() } // Unwrap lets the errors.Is function find the parent error, as well as its child error. func (err joinErr) Unwrap() []error { return []error{err.parent, err.child} }