...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/subscriber.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector

     1  package server
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"fmt"
     8  	"time"
     9  
    10  	"cloud.google.com/go/pubsub"
    11  	guuid "github.com/google/uuid"
    12  
    13  	"edge-infra.dev/pkg/edge/psqlinjector/metrics"
    14  	"edge-infra.dev/pkg/f8n/kinform/model"
    15  	"edge-infra.dev/pkg/lib/fog"
    16  )
    17  
    18  // ErrBadMessage is returned by functions so the pubsub message can be Acked and produce an error log.
    19  //
    20  // For instance, json.Unmarshal errors should be joined by ErrBadMessage since since it can't be processed by the handler.
    21  //
    22  // Functions should use BadMessageErrorf to provide the reason the message is bad.
    23  var ErrBadMessage = fmt.Errorf("bad message")
    24  
    25  // ErrIgnoredMessage is returned by functions so the pubsub message can be Acked without producing an error log.
    26  //
    27  // Functions should use IgnoredMessageErrorf to provide the reason the message is ignored.
    28  var ErrIgnoredMessage = fmt.Errorf("ignored message")
    29  
    30  // zeroUUID will be seen if kinform is misconfigured.
    31  var zeroUUID = (guuid.UUID{}).String()
    32  
    33  func (k *PSQLInjector) handleMsg(ctx context.Context, msg *pubsub.Message) error {
    34  	switch msg.Attributes[model.AttrPayloadType] {
    35  	// Heartbeat
    36  	case model.PayloadTypeClusterHeartbeat:
    37  		var heartbeat model.ClusterHeartbeat
    38  		if err := json.Unmarshal(msg.Data, &heartbeat); err != nil {
    39  			return BadMessageErrorf("failed to unmarshal cluster heartbeat pubsub message: %w", err)
    40  		}
    41  
    42  		return k.handleClusterHeartbeat(ctx, heartbeat)
    43  	// Event
    44  	case model.PayloadTypeWatchedEvent:
    45  		return k.handleEvent(ctx, msg)
    46  	// Scrape
    47  	case model.PayloadTypeScrapeMessage:
    48  		var sm model.ScrapeMessage
    49  		if err := json.Unmarshal(msg.Data, &sm); err != nil {
    50  			return BadMessageErrorf("failed to unmarshal scraped objects pubsub message: %w", err)
    51  		}
    52  		return k.handleScrapeMessage(ctx, sm)
    53  	// Field
    54  	case model.PayloadTypeWatchedField:
    55  		var wf model.WatchedField
    56  		if err := json.Unmarshal(msg.Data, &wf); err != nil {
    57  			return BadMessageErrorf("error unmarshaling model.WatchedField: %w", err)
    58  		}
    59  		return k.handleWatchedField(ctx, wf)
    60  	}
    61  
    62  	// this should really be a panic, but that would be annoying.
    63  	return IgnoredMessageErrorf("payload types should be validated before reaching this function")
    64  }
    65  
    66  // HandleMsg handles messages sent by the kinform client.
    67  func (k *PSQLInjector) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
    68  	var successful = new(bool)
    69  	var pt = msg.Attributes[model.AttrPayloadType]
    70  	var ceid = msg.Attributes[model.AttrClusterUUID]
    71  
    72  	// increase "messages_total" for every message received.
    73  	defer metrics.IncMessagesTotal(pt, ceid, *successful)
    74  
    75  	var log = fog.New().WithValues(
    76  		"payload_type", pt,
    77  		"cluster_uuid", ceid,
    78  	)
    79  	ctx = fog.IntoContext(ctx, log)
    80  
    81  	// when we get a bad message we should also log the attributes and data for analysis
    82  	var badMessageLog = log.WithValues("attributes", msg.Attributes, "data", msg.Data)
    83  
    84  	/*
    85  		Validate common message values that every kinform message contains.
    86  	*/
    87  
    88  	switch pt {
    89  	// Supported
    90  	case model.PayloadTypeClusterHeartbeat:
    91  	case model.PayloadTypeWatchedEvent:
    92  	case model.PayloadTypeWatchedField:
    93  	case model.PayloadTypeScrapeMessage:
    94  	// Missing
    95  	case "":
    96  		metrics.IncBadMessagesTotal(pt, ceid)
    97  		badMessageLog.Error(ErrBadMessage, "missing payload type attribute")
    98  		return nil
    99  	// Ignored
   100  	default:
   101  		// ignored messages are expected, so its logged as info, and marked successful
   102  		metrics.IncIgnoredMessagesTotal(pt, ceid)
   103  		log.Info("ignoring unsupported payload type")
   104  		*successful = true
   105  		return nil
   106  	}
   107  
   108  	if ceid == "" {
   109  		metrics.IncBadMessagesTotal(pt, ceid)
   110  		badMessageLog.Error(ErrBadMessage, "message is missing the cluster uuid attribute")
   111  		return nil
   112  	} else if ceid == zeroUUID {
   113  		metrics.IncBadMessagesTotal(pt, ceid)
   114  		badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid is the zero uuid")
   115  		return nil
   116  	} else if _, err := guuid.Parse(ceid); err != nil {
   117  		metrics.IncBadMessagesTotal(pt, ceid)
   118  		badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid not parsable", "parse_error", err)
   119  		return nil
   120  	}
   121  
   122  	// Every payload for psqlinjector contains the Cluster field.
   123  	// Although it's redundant, it allows functions to only care about the msg.Data.
   124  	// Check that the json is valid and the Cluster in the data matches the attribute.
   125  	if parsedCeid, err := parseClusterInJSON(msg.Data); err != nil {
   126  		metrics.IncBadMessagesTotal(pt, ceid)
   127  		badMessageLog.Error(ErrBadMessage, "failed to parse json in message data", "parse_error", err)
   128  		return nil
   129  	} else if parsedCeid != ceid {
   130  		metrics.IncBadMessagesTotal(pt, ceid)
   131  		badMessageLog.Error(ErrBadMessage, "message attribute for cluster uuid does not match message data for Cluster field", "parsed_cluster_uuid", parsedCeid)
   132  		return nil
   133  	}
   134  
   135  	/*
   136  		Handle the payload.
   137  
   138  		Then, based on the error, determine if the pubsub message should be acked or nacked by the subscriber.
   139  	*/
   140  	err := k.handleMsg(ctx, msg)
   141  
   142  	if err != nil {
   143  		switch {
   144  		case errors.Is(err, ErrBadMessage):
   145  			metrics.IncBadMessagesTotal(pt, ceid)
   146  			badMessageLog.Error(err, "acking bad message")
   147  			return nil
   148  		case errors.Is(err, ErrIgnoredMessage):
   149  			metrics.IncIgnoredMessagesTotal(pt, ceid)
   150  			log.Info("ignoring message", "reason", err)
   151  			*successful = true
   152  			return nil
   153  		default:
   154  			// requeue messages that are valid, but failed due to intermittent outages.
   155  			metrics.IncRequeueErrorsTotal(pt, ceid)
   156  			log.Error(err, "failed to handle message")
   157  			return err
   158  		}
   159  	}
   160  	*successful = true
   161  	return nil
   162  }
   163  
   164  func (k *PSQLInjector) handleScrapeMessage(ctx context.Context, sm model.ScrapeMessage) error {
   165  	var log = fog.FromContext(ctx).WithValues(
   166  		"start_time", sm.StartTime,
   167  		"done_time", sm.DoneTime,
   168  	)
   169  
   170  	if sm.StartTime.IsZero() {
   171  		// sanity check
   172  		return BadMessageErrorf("scrape message start time is zero")
   173  	}
   174  
   175  	select {
   176  	case <-time.After(k.cfg.DelayScrapeMessageProcessing):
   177  		// prevent database thrashing from out-of-order pubsub messages
   178  	case <-ctx.Done():
   179  		return ctx.Err()
   180  	}
   181  
   182  	err := k.sql.DeleteOutdatedWatchedFieldObjects(ctx, sm)
   183  	if err != nil {
   184  		log.Error(err, "failed to delete outdated watched field objects")
   185  		return err
   186  	}
   187  
   188  	log.Info("successfully handled scraped objects")
   189  	return nil
   190  }
   191  
   192  // handleClusterHeartbeatMessage will update the status time in the db for the cluster
   193  func (k *PSQLInjector) handleClusterHeartbeat(ctx context.Context, heartbeat model.ClusterHeartbeat) error {
   194  	var log = fog.FromContext(ctx).WithValues(
   195  		"cluster_version_major", heartbeat.ClusterVersion.Major,
   196  		"cluster_version_minor", heartbeat.ClusterVersion.Minor,
   197  		"session_id", heartbeat.SessionID,
   198  	)
   199  
   200  	err := k.sql.SetClusterHeartbeatTime(ctx, heartbeat.Timestamp, heartbeat.Cluster)
   201  	if err != nil {
   202  		log.Error(err, "failed to set cluster heartbeat in database")
   203  		return err
   204  	}
   205  
   206  	log.Info("successfully updated cluster heartbeat")
   207  	return nil
   208  }
   209  
   210  // handleEvent appends a new cluster event to the cluster_events table.
   211  func (k *PSQLInjector) handleEvent(_ context.Context, _ *pubsub.Message) error {
   212  	return IgnoredMessageErrorf("not implemented yet")
   213  }
   214  
   215  func (k *PSQLInjector) handleWatchedField(ctx context.Context, wf model.WatchedField) error {
   216  	var log = fog.FromContext(ctx).WithValues("watched_field", wf)
   217  	if err := wf.Validate(); err != nil {
   218  		return BadMessageErrorf("invalid model.WatchedField: %w", err)
   219  	}
   220  
   221  	switch wf.Event {
   222  	case model.ResourceAdd, model.ResourceUpdate:
   223  		err := k.sql.SetWatchedField(ctx, wf)
   224  		if err != nil {
   225  			log.Error(err, "failed to set watched field in database")
   226  			return err
   227  		}
   228  	case model.ResourceDelete:
   229  		err := k.sql.DeleteWatchedField(ctx, wf)
   230  		if err != nil {
   231  			log.Error(err, "failed to delete watched field in database")
   232  			return err
   233  		}
   234  	default:
   235  		return IgnoredMessageErrorf("unknown event in watched field: %q", wf.Event)
   236  	}
   237  
   238  	log.Info("successfully handled watched field")
   239  	return nil
   240  }
   241  
   242  // parseClusterInJSON parses the Cluster field contained in msg.Data
   243  func parseClusterInJSON(data []byte) (string, error) {
   244  	// The Cluster field in messages must equal the cluster uuid attribute.
   245  	// This also determines if the msg.Data is valid JSON
   246  	var common struct {
   247  		Cluster *guuid.UUID `json:",omitempty"`
   248  	}
   249  	if err := json.Unmarshal(data, &common); err != nil {
   250  		return "", BadMessageErrorf("message json is invalid: %w", err)
   251  	} else if common.Cluster == nil {
   252  		return "", BadMessageErrorf("missing the required '.Cluster' field in json data")
   253  	}
   254  	return common.Cluster.String(), nil
   255  }
   256  
   257  // BadMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrBadMessage)`
   258  func BadMessageErrorf(msg string, values ...any) error {
   259  	return joinErr{
   260  		parent: ErrBadMessage,
   261  		child:  fmt.Errorf(msg, values...),
   262  	}
   263  }
   264  
   265  // IgnoredMessageErrorf wraps fmt.Errorf and returns an error that can be checked with `errors.Is(err, ErrIgnoredMessage)`
   266  func IgnoredMessageErrorf(msg string, values ...any) error {
   267  	return joinErr{
   268  		parent: ErrIgnoredMessage,
   269  		child:  fmt.Errorf(msg, values...),
   270  	}
   271  }
   272  
   273  type joinErr struct {
   274  	parent error
   275  	child  error
   276  }
   277  
   278  // Error only prints the child error since printing the parent is redundant in logs.
   279  func (err joinErr) Error() string {
   280  	return err.child.Error()
   281  }
   282  
   283  // Unwrap lets the errors.Is function find the parent error, as well as its child error.
   284  func (err joinErr) Unwrap() []error {
   285  	return []error{err.parent, err.child}
   286  }
   287  

View as plain text