package plank import ( "context" "fmt" "strings" "time" "cloud.google.com/go/pubsub" "github.com/go-logr/logr" "edge-infra.dev/pkg/lib/db/postgres" "edge-infra.dev/pkg/lib/gcp/monitoring/alerting" "edge-infra.dev/pkg/lib/runtime/subscriber" "edge-infra.dev/pkg/lib/webhooks" ) var ( insertSQL = ` INSERT INTO incidents( incident_id, started_at, ended_at, policy_name, project_id, cluster_id, url, state, location, namespace, pod, container, metrics_labels ) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ` ) type Plank struct { logger logr.Logger slack webhooks.Slack dbClient postgres.Client } func New(logger logr.Logger, slackURL string, client postgres.Client) *Plank { p := &Plank{ logger: logger, dbClient: client, } p.initMetrics() if slackURL != "" { slackWebhook := webhooks.NewSlackWebhook(slackURL, func(o *webhooks.Options) { o.Logger = &logger }) p.slack = slackWebhook p.initSlackMetrics() } return p } // Assert that Plank implements subscriber.Handler interface var _ subscriber.Handler = &Plank{} func (p *Plank) HandleMsg(_ context.Context, msg *pubsub.Message) error { wrapper, err := alerting.Unmarshal(msg.Data) if err != nil { if strings.Contains(err.Error(), "unsupported") { p.logger.Error(nil, "unsupported incident message", "data", string(msg.Data)) } return err } inc := &wrapper.Incident project := inc.Resource.Labels["project_id"] cluster := inc.Resource.Labels["cluster"] if cluster == "" { // todo - should consolidate; cluster is more common cluster = inc.Resource.Labels["cluster_name"] } // optional resource labels var location, ns, pod, container string if inc.Resource.Labels["location"] != "" { location = inc.Resource.Labels["location"] } if inc.Resource.Labels["namespace_name"] != "" { ns = inc.Resource.Labels["namespace_name"] } if inc.Resource.Labels["pod_name"] != "" { pod = inc.Resource.Labels["pod_name"] } if inc.Resource.Labels["container_name"] != "" { container = inc.Resource.Labels["container_name"] } // metric labels vary much more so stick them all together and parse them out when reading var metricsLables string for k, v := range inc.Metric.Labels { metricsLables = fmt.Sprintf("%s,%s=%s", metricsLables, k, v) } metricsLables = strings.TrimLeft(metricsLables, ",") insertErr := p.dbClient.Insert(insertSQL, inc.ID, inc.StartedAt, inc.EndedAt, inc.PolicyName, project, cluster, inc.URL, inc.State, location, ns, pod, container, metricsLables) if insertErr != nil { // todo - maybe look into returning customer errors for unique cases // TODO(help_wanted) - ignore duplicates. possible solution is to use use the pgx solution directly // (https://github.com/jackc/pgx/blob/4fc4f9a60337af3bd7c6abdf6c71460712d112fc/large_objects_test.go#L157) // or possibly build our own wrapper in the common lib and return customer error types StorageInsertErrors.WithLabelValues().Inc() p.logger.Error(insertErr, "failed to insert incident") return insertErr } StorageInserts.WithLabelValues().Inc() if p.slack != nil { slackMsg := buildMsg(inc) if serr := p.slack.SendMsg(slackMsg); serr != nil { // todo - can look into adding status code if we actually want to go with this integration SlackErrors.WithLabelValues().Inc() return serr } } return nil } func (p *Plank) initMetrics() { StorageConnectionStatus.WithLabelValues().Set(0) StorageInserts.WithLabelValues().Add(0) StorageInsertErrors.WithLabelValues().Add(0) go func() { for { if p.dbClient.IsConnected() { StorageConnectionStatus.WithLabelValues().Set(1) } else { StorageConnectionStatus.WithLabelValues().Set(0) } time.Sleep(10 * time.Second) } }() } func (p *Plank) initSlackMetrics() { SlackErrors.WithLabelValues().Add(0) } func buildMsg(inc *alerting.Incident) *webhooks.SlackMessage { slackMessage := "> *Title:* " + inc.PolicyName + "\n>\n>" + " *Project:* \n>" + inc.ScopingProjectID + "\n>\n>" + " *Issue:* <" + inc.URL + " | " + inc.ID + ">\n>" for k, v := range inc.Resource.Labels { slackMessage = fmt.Sprintf("%s *%s:* %s\n", slackMessage, k, v) } slackMessage = fmt.Sprintf("%s\n", slackMessage) return &webhooks.SlackMessage{ // ID: inc.ID, Text: slackMessage, } }