1 package plank
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "cloud.google.com/go/pubsub"
10 "github.com/go-logr/logr"
11
12 "edge-infra.dev/pkg/lib/db/postgres"
13 "edge-infra.dev/pkg/lib/gcp/monitoring/alerting"
14 "edge-infra.dev/pkg/lib/runtime/subscriber"
15 "edge-infra.dev/pkg/lib/webhooks"
16 )
17
18 var (
19 insertSQL = `
20 INSERT INTO incidents(
21 incident_id,
22 started_at,
23 ended_at,
24 policy_name,
25 project_id,
26 cluster_id,
27 url,
28 state,
29 location,
30 namespace,
31 pod,
32 container,
33 metrics_labels
34 ) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
35 `
36 )
37
38 type Plank struct {
39 logger logr.Logger
40 slack webhooks.Slack
41 dbClient postgres.Client
42 }
43
44 func New(logger logr.Logger, slackURL string, client postgres.Client) *Plank {
45 p := &Plank{
46 logger: logger,
47 dbClient: client,
48 }
49 p.initMetrics()
50
51 if slackURL != "" {
52 slackWebhook := webhooks.NewSlackWebhook(slackURL, func(o *webhooks.Options) {
53 o.Logger = &logger
54 })
55 p.slack = slackWebhook
56 p.initSlackMetrics()
57 }
58
59 return p
60 }
61
62
63 var _ subscriber.Handler = &Plank{}
64
65 func (p *Plank) HandleMsg(_ context.Context, msg *pubsub.Message) error {
66 wrapper, err := alerting.Unmarshal(msg.Data)
67 if err != nil {
68 if strings.Contains(err.Error(), "unsupported") {
69 p.logger.Error(nil, "unsupported incident message", "data", string(msg.Data))
70 }
71 return err
72 }
73
74 inc := &wrapper.Incident
75 project := inc.Resource.Labels["project_id"]
76 cluster := inc.Resource.Labels["cluster"]
77 if cluster == "" {
78 cluster = inc.Resource.Labels["cluster_name"]
79 }
80
81
82 var location, ns, pod, container string
83 if inc.Resource.Labels["location"] != "" {
84 location = inc.Resource.Labels["location"]
85 }
86 if inc.Resource.Labels["namespace_name"] != "" {
87 ns = inc.Resource.Labels["namespace_name"]
88 }
89 if inc.Resource.Labels["pod_name"] != "" {
90 pod = inc.Resource.Labels["pod_name"]
91 }
92 if inc.Resource.Labels["container_name"] != "" {
93 container = inc.Resource.Labels["container_name"]
94 }
95
96
97 var metricsLables string
98 for k, v := range inc.Metric.Labels {
99 metricsLables = fmt.Sprintf("%s,%s=%s", metricsLables, k, v)
100 }
101 metricsLables = strings.TrimLeft(metricsLables, ",")
102
103 insertErr := p.dbClient.Insert(insertSQL, inc.ID, inc.StartedAt, inc.EndedAt, inc.PolicyName, project,
104 cluster, inc.URL, inc.State, location, ns, pod, container, metricsLables)
105 if insertErr != nil {
106
107
108
109 StorageInsertErrors.WithLabelValues().Inc()
110 p.logger.Error(insertErr, "failed to insert incident")
111 return insertErr
112 }
113 StorageInserts.WithLabelValues().Inc()
114
115 if p.slack != nil {
116 slackMsg := buildMsg(inc)
117 if serr := p.slack.SendMsg(slackMsg); serr != nil {
118
119 SlackErrors.WithLabelValues().Inc()
120 return serr
121 }
122 }
123
124 return nil
125 }
126
127 func (p *Plank) initMetrics() {
128 StorageConnectionStatus.WithLabelValues().Set(0)
129 StorageInserts.WithLabelValues().Add(0)
130 StorageInsertErrors.WithLabelValues().Add(0)
131
132 go func() {
133 for {
134 if p.dbClient.IsConnected() {
135 StorageConnectionStatus.WithLabelValues().Set(1)
136 } else {
137 StorageConnectionStatus.WithLabelValues().Set(0)
138 }
139 time.Sleep(10 * time.Second)
140 }
141 }()
142 }
143
144 func (p *Plank) initSlackMetrics() {
145 SlackErrors.WithLabelValues().Add(0)
146 }
147
148 func buildMsg(inc *alerting.Incident) *webhooks.SlackMessage {
149 slackMessage := "> *Title:* " + inc.PolicyName + "\n>\n>" +
150 " *Project:* \n>" + inc.ScopingProjectID + "\n>\n>" +
151 " *Issue:* <" + inc.URL + " | " + inc.ID + ">\n>"
152
153 for k, v := range inc.Resource.Labels {
154 slackMessage = fmt.Sprintf("%s *%s:* %s\n", slackMessage, k, v)
155 }
156 slackMessage = fmt.Sprintf("%s\n", slackMessage)
157
158 return &webhooks.SlackMessage{
159
160 Text: slackMessage,
161 }
162 }
163
View as plain text