...

Source file src/edge-infra.dev/pkg/edge/monitoring/plank/plank.go

Documentation: edge-infra.dev/pkg/edge/monitoring/plank

     1  package plank
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  	"time"
     8  
     9  	"cloud.google.com/go/pubsub"
    10  	"github.com/go-logr/logr"
    11  
    12  	"edge-infra.dev/pkg/lib/db/postgres"
    13  	"edge-infra.dev/pkg/lib/gcp/monitoring/alerting"
    14  	"edge-infra.dev/pkg/lib/runtime/subscriber"
    15  	"edge-infra.dev/pkg/lib/webhooks"
    16  )
    17  
    18  var (
    19  	insertSQL = `
    20  	INSERT INTO incidents(
    21  		incident_id,
    22  		started_at,
    23  		ended_at,
    24  		policy_name,
    25  		project_id,
    26  		cluster_id,
    27  		url,
    28  		state,
    29  		location,
    30  		namespace,
    31  		pod,
    32  		container,
    33  		metrics_labels
    34  	) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
    35  	`
    36  )
    37  
    38  type Plank struct {
    39  	logger   logr.Logger
    40  	slack    webhooks.Slack
    41  	dbClient postgres.Client
    42  }
    43  
    44  func New(logger logr.Logger, slackURL string, client postgres.Client) *Plank {
    45  	p := &Plank{
    46  		logger:   logger,
    47  		dbClient: client,
    48  	}
    49  	p.initMetrics()
    50  
    51  	if slackURL != "" {
    52  		slackWebhook := webhooks.NewSlackWebhook(slackURL, func(o *webhooks.Options) {
    53  			o.Logger = &logger
    54  		})
    55  		p.slack = slackWebhook
    56  		p.initSlackMetrics()
    57  	}
    58  
    59  	return p
    60  }
    61  
    62  // Assert that Plank implements subscriber.Handler interface
    63  var _ subscriber.Handler = &Plank{}
    64  
    65  func (p *Plank) HandleMsg(_ context.Context, msg *pubsub.Message) error {
    66  	wrapper, err := alerting.Unmarshal(msg.Data)
    67  	if err != nil {
    68  		if strings.Contains(err.Error(), "unsupported") {
    69  			p.logger.Error(nil, "unsupported incident message", "data", string(msg.Data))
    70  		}
    71  		return err
    72  	}
    73  
    74  	inc := &wrapper.Incident
    75  	project := inc.Resource.Labels["project_id"]
    76  	cluster := inc.Resource.Labels["cluster"]
    77  	if cluster == "" { // todo - should consolidate; cluster is more common
    78  		cluster = inc.Resource.Labels["cluster_name"]
    79  	}
    80  
    81  	// optional resource labels
    82  	var location, ns, pod, container string
    83  	if inc.Resource.Labels["location"] != "" {
    84  		location = inc.Resource.Labels["location"]
    85  	}
    86  	if inc.Resource.Labels["namespace_name"] != "" {
    87  		ns = inc.Resource.Labels["namespace_name"]
    88  	}
    89  	if inc.Resource.Labels["pod_name"] != "" {
    90  		pod = inc.Resource.Labels["pod_name"]
    91  	}
    92  	if inc.Resource.Labels["container_name"] != "" {
    93  		container = inc.Resource.Labels["container_name"]
    94  	}
    95  
    96  	// metric labels vary much more so stick them all together and parse them out when reading
    97  	var metricsLables string
    98  	for k, v := range inc.Metric.Labels {
    99  		metricsLables = fmt.Sprintf("%s,%s=%s", metricsLables, k, v)
   100  	}
   101  	metricsLables = strings.TrimLeft(metricsLables, ",")
   102  
   103  	insertErr := p.dbClient.Insert(insertSQL, inc.ID, inc.StartedAt, inc.EndedAt, inc.PolicyName, project,
   104  		cluster, inc.URL, inc.State, location, ns, pod, container, metricsLables)
   105  	if insertErr != nil { // todo - maybe look into returning customer errors for unique cases
   106  		// TODO(help_wanted) - ignore duplicates. possible solution is to use use the pgx solution directly
   107  		// (https://github.com/jackc/pgx/blob/4fc4f9a60337af3bd7c6abdf6c71460712d112fc/large_objects_test.go#L157)
   108  		// or possibly build our own wrapper in the common lib and return customer error types
   109  		StorageInsertErrors.WithLabelValues().Inc()
   110  		p.logger.Error(insertErr, "failed to insert incident")
   111  		return insertErr
   112  	}
   113  	StorageInserts.WithLabelValues().Inc()
   114  
   115  	if p.slack != nil {
   116  		slackMsg := buildMsg(inc)
   117  		if serr := p.slack.SendMsg(slackMsg); serr != nil {
   118  			// todo - can look into adding status code if we actually want to go with this integration
   119  			SlackErrors.WithLabelValues().Inc()
   120  			return serr
   121  		}
   122  	}
   123  
   124  	return nil
   125  }
   126  
   127  func (p *Plank) initMetrics() {
   128  	StorageConnectionStatus.WithLabelValues().Set(0)
   129  	StorageInserts.WithLabelValues().Add(0)
   130  	StorageInsertErrors.WithLabelValues().Add(0)
   131  
   132  	go func() {
   133  		for {
   134  			if p.dbClient.IsConnected() {
   135  				StorageConnectionStatus.WithLabelValues().Set(1)
   136  			} else {
   137  				StorageConnectionStatus.WithLabelValues().Set(0)
   138  			}
   139  			time.Sleep(10 * time.Second)
   140  		}
   141  	}()
   142  }
   143  
   144  func (p *Plank) initSlackMetrics() {
   145  	SlackErrors.WithLabelValues().Add(0)
   146  }
   147  
   148  func buildMsg(inc *alerting.Incident) *webhooks.SlackMessage {
   149  	slackMessage := "> *Title:* " + inc.PolicyName + "\n>\n>" +
   150  		" *Project:* \n>" + inc.ScopingProjectID + "\n>\n>" +
   151  		" *Issue:* <" + inc.URL + " | " + inc.ID + ">\n>"
   152  
   153  	for k, v := range inc.Resource.Labels {
   154  		slackMessage = fmt.Sprintf("%s *%s:* %s\n", slackMessage, k, v)
   155  	}
   156  	slackMessage = fmt.Sprintf("%s\n", slackMessage)
   157  
   158  	return &webhooks.SlackMessage{
   159  		// ID:   inc.ID,
   160  		Text: slackMessage,
   161  	}
   162  }
   163  

View as plain text