...

Source file src/edge-infra.dev/pkg/edge/chariot/daemon.go

Documentation: edge-infra.dev/pkg/edge/chariot

     1  package chariot
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"fmt"
     8  	"io"
     9  	"net/http"
    10  	"os"
    11  	"sync"
    12  	"time"
    13  
    14  	"cloud.google.com/go/pubsub"
    15  	"cloud.google.com/go/storage"
    16  	"google.golang.org/api/googleapi"
    17  
    18  	helmApi "github.com/fluxcd/helm-controller/api/v2beta1"
    19  	kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1beta2"
    20  
    21  	"edge-infra.dev/pkg/edge/constants"
    22  	eamodel "edge-infra.dev/pkg/edge/edgeagent/model"
    23  	"edge-infra.dev/pkg/lib/mqtt"
    24  )
    25  
    26  const (
    27  	sevError   = "error"
    28  	sevWarning = "warning"
    29  	sevInfo    = "info"
    30  )
    31  
    32  // loggerOutput is a io.ReadWriter protected by a mutex.  It prevents garbled log output.
    33  var loggerOutput = struct {
    34  	sync.Mutex // called before rw is written to.
    35  	rw         io.ReadWriter
    36  }{
    37  	rw: os.Stdout,
    38  }
    39  
    40  // Option functions set private fields in the Daemon.
    41  type Option func(*Daemon) error
    42  
    43  // OptionGoogleCloudStorage sets the Google Cloud Storage client in the Daemon.
    44  func OptionGoogleCloudStorage(client *storage.Client) Option {
    45  	return func(d *Daemon) error {
    46  		if client == nil {
    47  			return fmt.Errorf("Google Cloud Storage client must not be nil")
    48  		}
    49  		d.storer = NewGoogleCloudStorage(client)
    50  		return nil
    51  	}
    52  }
    53  
    54  // OptionPubSubResponsePublisher sets the Publisher that handles chariot responses.
    55  func OptionPubSubResponsePublisher(p Publisher) Option {
    56  	return func(d *Daemon) error {
    57  		if p == nil {
    58  			return fmt.Errorf("Publisher must not be nil")
    59  		}
    60  		d.response = p
    61  		return nil
    62  	}
    63  }
    64  
    65  // OptionPubSubReceiver sets the PubSub interface in the Daemon.
    66  func OptionPubSubReceiver(ipsr IPubSubReceiver) Option {
    67  	return func(d *Daemon) error {
    68  		if ipsr == nil {
    69  			return fmt.Errorf("IPubSubReceiver must not be nil")
    70  		}
    71  		d.pubSubReceiver = ipsr
    72  		return nil
    73  	}
    74  }
    75  
    76  // Daemon is the Chariot process that pulls and processes PubSub messages.
    77  type Daemon struct {
    78  	sync.Mutex // this mutex protects the pubSubReceiverHandle method.
    79  
    80  	pubSubReceiver IPubSubReceiver
    81  
    82  	storer *GoogleCloudStorage
    83  
    84  	response Publisher
    85  
    86  	metrics *Metrics
    87  }
    88  
    89  // NewDaemon returns a Daemon with the provided options set.
    90  func NewDaemon(options ...Option) (*Daemon, error) {
    91  	var d = new(Daemon)
    92  	for _, opt := range options {
    93  		if err := opt(d); err != nil {
    94  			return nil, fmt.Errorf("Error setting option: %w", err)
    95  		}
    96  	}
    97  	d.metrics = NewMetrics()
    98  	return d, nil
    99  }
   100  
   101  // Run starts the daemon and blocks until an error occurs.
   102  func (d *Daemon) Run(ctx context.Context) error {
   103  	if d.response == nil {
   104  		d.response = NopPublisher{}
   105  	}
   106  
   107  	err := d.pubSubReceiver.Receive(ctx, d.pubSubReceiverHandle)
   108  	return fmt.Errorf("Error returned by Receive function: %w", err)
   109  }
   110  
   111  // pubSubReceiverHandle is called for each PubSub message.
   112  func (d *Daemon) pubSubReceiverHandle(ctx context.Context, ipsm IPubSubMessage) {
   113  	go d.handleRequest(ctx, ipsm)
   114  }
   115  
   116  // performStorageOperation panics if the request's operation is invalid.
   117  func (d *Daemon) performStorageOperation(ctx context.Context, req Request) (si StorageInfo, err error) {
   118  	var objects []StorageObject
   119  	if objects, err = req.StorageObjects(); err != nil {
   120  		return si, fmt.Errorf("Error generating storage objects: %w", err)
   121  	}
   122  
   123  	switch req.Operation {
   124  	case OperationCreate:
   125  		return d.storer.Put(ctx, objects...)
   126  	case OperationDelete:
   127  		return d.storer.Delete(ctx, objects...)
   128  	default:
   129  		const panicmsg = "Requests should have been validated before performStorageOperation is called. Got unknown operation: %q"
   130  		panic(fmt.Sprintf(panicmsg, req.Operation))
   131  	}
   132  }
   133  
   134  // publishEventNotification takes a request and sends a notification to the
   135  // banner topic for edge-agent to notify stores that configuration has changed
   136  func (d *Daemon) publishEventNotification(ctx context.Context, req Request, objects ...StorageObject) error {
   137  	switch req.Operation {
   138  	case OperationCreate:
   139  		return d.notify(ctx, req, objects...)
   140  	case OperationDelete:
   141  		//currently we do not consider delete events
   142  	default:
   143  		return nil
   144  	}
   145  	return nil
   146  }
   147  
   148  func (d *Daemon) notify(ctx context.Context, req Request, objects ...StorageObject) error {
   149  	//create notification pubsub message to send to edge agent
   150  	notification := eamodel.NewNotificationMessage().
   151  		SetActor(req.Owner).
   152  		SetClusterEdgeID(req.Cluster)
   153  
   154  	// boolean to indicate if kustomization needs to sync, only one sync message needs to be sent for n objects
   155  	syncKustomization := false
   156  
   157  	for _, obj := range objects {
   158  		var event *eamodel.Event
   159  
   160  		//StorageObject.Content is the string representation of an object,
   161  		//converting it back to bytes to read object metadata
   162  		obj, err := ParseYamlGVKNN([]byte(obj.Content))
   163  		if err != nil {
   164  			return fmt.Errorf("error parsing storage object for notifier: %w", err)
   165  		}
   166  		switch obj.Kind {
   167  		case helmApi.HelmReleaseKind:
   168  			event = eamodel.BuildEdgeAgentEvent(obj.Metadata.Name, helmApi.HelmReleaseKind, constants.FluxSystem, eamodel.EventTypeReconcile)
   169  		// for all other synced object we want to reconile the chariot-sync kustomization
   170  		default:
   171  			if syncKustomization {
   172  				continue
   173  			}
   174  			event = eamodel.BuildEdgeAgentEvent(eamodel.ChariotSync, kustomizeApi.KustomizationKind, constants.FluxEdgeNamespace, eamodel.EventTypeReconcile)
   175  			//set value to true to not create any more kustomization sync events
   176  			syncKustomization = true
   177  		}
   178  		if event != nil {
   179  			notification.AddEvent(*event)
   180  		}
   181  	}
   182  
   183  	if len(notification.Events) > 0 {
   184  		// create pubsub client for the banner project the notification message will be published to
   185  		psNotifierClient, err := pubsub.NewClient(ctx, req.Banner)
   186  		if err != nil {
   187  			return fmt.Errorf("error creating pubsub client for notifications: %w", err)
   188  		}
   189  		// defer closing pubsub client once message has been published
   190  		defer psNotifierClient.Close()
   191  
   192  		ipsnotifier, err := mqtt.NewGooglePubSubTopicWrapper(ctx, psNotifierClient, req.Banner, eamodel.EdgeAgentTopicAndOwner)
   193  		if err != nil {
   194  			return fmt.Errorf("error fetching pubsub topic for notifications: %w", err)
   195  		}
   196  		// defer cleanup of topic goroutines and resources once message has been published
   197  		defer ipsnotifier.Stop()
   198  
   199  		// publish notification
   200  		return ipsnotifier.Publish(ctx, notification)
   201  	}
   202  	return nil
   203  }
   204  
   205  // handleRequest takes a pubsub message and performs the operation.
   206  func (d *Daemon) handleRequest(ctx context.Context, ipsm IPubSubMessage) {
   207  	var ackdeadline, cancel = context.WithTimeout(ctx, AckDeadline)
   208  	defer cancel()
   209  
   210  	// Create chariot pubsub response that satisfies the IPubSubMessage interface.
   211  	var resp = &chariotResponseMessage{
   212  		requestID: ipsm.ID(),
   213  	}
   214  
   215  	// Log metrics for every request.
   216  	var req Request
   217  	defer func() {
   218  		var successful = resp.err == nil
   219  		d.metrics.IncRequestsTotal(req, successful)
   220  
   221  		// record metrics about errors.
   222  		if !successful {
   223  			d.metrics.IncErrorsTotal(req)
   224  			if errors.Is(resp.err, context.DeadlineExceeded) {
   225  				d.metrics.IncDeadlineExceededTotal(AckDeadline)
   226  			}
   227  			var gerr *googleapi.Error
   228  			if errors.As(resp.err, &gerr) {
   229  				d.metrics.IncGoogleAPIErrorsTotal(resp.err)
   230  			}
   231  		}
   232  	}()
   233  
   234  	// Decode the json Chariot request
   235  	if err := json.Unmarshal(ipsm.Data(), &req); err != nil {
   236  		// This error can't be retried.
   237  		resp.err = fmt.Errorf("Error decoding request JSON: %w", err)
   238  		logRequestJSONError(ipsm, resp.err)
   239  
   240  		// Send an error response.
   241  		if err = d.response.Publish(ackdeadline, resp); err != nil {
   242  			logResponseError(ipsm.ID(), err)
   243  		}
   244  		ipsm.Ack() // This error can't be retried.
   245  		return
   246  	}
   247  
   248  	// Set the owner and operation fields in the response.
   249  	// The owner field is used to identify pubsub response recipients.
   250  	resp.owner = req.Owner
   251  	resp.operation = req.Operation
   252  
   253  	if err := req.Validate(); err != nil {
   254  		resp.err = fmt.Errorf("Request validation error: %w", err)
   255  		logRequestJSONError(ipsm, resp.err)
   256  
   257  		// Send an error response
   258  		if err = d.response.Publish(ctx, resp); err != nil {
   259  			logResponseError(ipsm.ID(), err)
   260  		}
   261  		ipsm.Ack() // This error can't be retried.
   262  		return
   263  	}
   264  
   265  	// Perform the storage operation
   266  	var si StorageInfo
   267  	si, resp.err = d.performStorageOperation(ackdeadline, req)
   268  
   269  	// Log & record metrics every time performStorageOperation is called. Do it before checking errors.
   270  	d.metrics.IncStorageOperationsTotal(req, si)
   271  	logStorageInfo(ipsm.ID(), si)
   272  
   273  	if resp.err != nil {
   274  		var reason = fmt.Sprintf("Storage operation failed: %v", resp.err)
   275  		logRetryEvent(ipsm, reason)
   276  
   277  		// Log more info about the error when able.
   278  		// The error can be both a google api error and a deadline error thanks to the errorsnode package.
   279  		if errors.Is(resp.err, context.DeadlineExceeded) {
   280  			logAckDeadlineExpired(ipsm, "Ack deadline reached while performing storage operation")
   281  		}
   282  		if IsGoogleAPIError(resp.err) {
   283  			logGoogleAPIErr(ipsm, resp.err)
   284  		}
   285  		// Nack without publishing a response since the request will be retried.
   286  		ipsm.Nack()
   287  		return
   288  	}
   289  
   290  	// If we reach here, we can publish the response
   291  	if err := d.response.Publish(ctx, resp); err != nil {
   292  		logResponseError(ipsm.ID(), err)
   293  	}
   294  	ipsm.Ack()
   295  
   296  	if req.Notify {
   297  		//publish to edge agent topic for every successful storage put opteration
   298  		if err := d.publishEventNotification(ackdeadline, req, si.ObjectsPut...); err != nil {
   299  			logNotificationError(resp.requestID, err)
   300  		}
   301  	}
   302  }
   303  
   304  const LogAckDeadlineExpiredMessage = "Ack deadline expired"
   305  
   306  type LogAckDeadlineExpiredMessageObject struct {
   307  	Message  string `json:"message"`
   308  	Severity string `json:"severity"`
   309  
   310  	RequestID       string `json:"pubsub_request_id"`
   311  	TimeoutDuration string `json:"timeout_duration"`
   312  
   313  	Why string `json:"why,omitempty"`
   314  }
   315  
   316  func logAckDeadlineExpired(ipsm IPubSubMessage, why string) {
   317  	var l = LogAckDeadlineExpiredMessageObject{
   318  		Message:         LogAckDeadlineExpiredMessage,
   319  		Severity:        sevError,
   320  		RequestID:       ipsm.ID(),
   321  		TimeoutDuration: fmt.Sprintf("%v", AckDeadline),
   322  		Why:             why,
   323  	}
   324  
   325  	loggerOutput.Lock()
   326  	defer loggerOutput.Unlock()
   327  	//nolint:errcheck
   328  	json.NewEncoder(loggerOutput.rw).Encode(l)
   329  }
   330  
   331  type LogRetryEventMessageObject struct {
   332  	Message  string `json:"message"`
   333  	Severity string `json:"severity"`
   334  
   335  	RequestID string `json:"pubsub_request_id"`
   336  
   337  	Reason string `json:"retry_reason"`
   338  }
   339  
   340  const LogRetryEventMessage = "Retry chariot request"
   341  
   342  func logRetryEvent(ipsm IPubSubMessage, reason string) {
   343  	var l = LogRetryEventMessageObject{
   344  		Message:   LogRetryEventMessage,
   345  		Severity:  sevInfo,
   346  		RequestID: ipsm.ID(),
   347  		Reason:    reason,
   348  	}
   349  
   350  	loggerOutput.Lock()
   351  	defer loggerOutput.Unlock()
   352  	//nolint:errcheck
   353  	json.NewEncoder(loggerOutput.rw).Encode(l)
   354  }
   355  
   356  type LogRequestJSONErrorMessageObject struct {
   357  	Message  string `json:"message"`
   358  	Severity string `json:"severity"`
   359  
   360  	Err string `json:"error"`
   361  
   362  	Data      []byte `json:"pubsub_data"`
   363  	RequestID string `json:"pubsub_request_id"`
   364  }
   365  
   366  const LogRequestJSONErrorMessage = "Error parsing request JSON"
   367  
   368  func logRequestJSONError(ipsm IPubSubMessage, err error) {
   369  	var lrjemo = LogRequestJSONErrorMessageObject{
   370  		Message:   LogRequestJSONErrorMessage,
   371  		Severity:  sevError,
   372  		Err:       err.Error(),
   373  		Data:      ipsm.Data(),
   374  		RequestID: ipsm.ID(),
   375  	}
   376  
   377  	loggerOutput.Lock()
   378  	defer loggerOutput.Unlock()
   379  	//nolint:errcheck
   380  	json.NewEncoder(loggerOutput.rw).Encode(lrjemo)
   381  }
   382  
   383  const SuccessLogMessage = "Successfully processed pubsub message"
   384  const FailureLogMessage = "Failed to process pubsub message"
   385  
   386  // PubSubLogMessageObject is logged when an Ack occurs.
   387  type PubSubLogMessageObject struct {
   388  	Severity string `json:"severity"` // "info", "error"
   389  	Message  string `json:"message"`  // The log message
   390  
   391  	// Err is an error that caused Chariot to not process the reqest.
   392  	Err string `json:"error,omitempty"`
   393  
   394  	// PubSub info
   395  	ID              string            `json:"pubsub_id"`
   396  	Data            []byte            `json:"pubsub_data,omitempty"`
   397  	Attributes      map[string]string `json:"pubsub_attributes,omitempty"`
   398  	PublishTime     time.Time         `json:"pubsub_publish_time,omitempty"`
   399  	DeliveryAttempt int               `json:"pubsub_delivery_attempt,omitempty"`
   400  	OrderingKey     string            `json:"pubsub_ordering_key,omitempty"`
   401  }
   402  
   403  const ResponseErrorLogMessage = "Error publishing pubsub response"
   404  
   405  type ResponseErrorLogMessageObject struct {
   406  	Severity string `json:"severity"` // "error"
   407  	Message  string `json:"message"`  // set to ResponseErrorLogMessage
   408  
   409  	// RequestID is logged to tie StorageInfo logs to their requests.
   410  	RequestID string `json:"request_pubsub_id"`
   411  
   412  	Error string `json:"error"` // The error returned by the Publish method.
   413  }
   414  
   415  func logResponseError(reqID string, err error) {
   416  	loggerOutput.Lock()
   417  	defer loggerOutput.Unlock()
   418  
   419  	//nolint:errcheck
   420  	json.NewEncoder(loggerOutput.rw).Encode(ResponseErrorLogMessageObject{
   421  		Severity:  sevError,
   422  		Message:   ResponseErrorLogMessage,
   423  		RequestID: reqID,
   424  		Error:     err.Error(),
   425  	})
   426  }
   427  
   428  const LogNotificationErrorMessage = "Error sending notification message"
   429  
   430  type NotificationErrorLogMessageObject struct {
   431  	Severity string `json:"severity"`
   432  	Message  string `json:"message"` // set to LogNotificationErrorMessage
   433  	// RequestID is logged to tie Notification logs to their requests.
   434  	RequestID string `json:"request_pubsub_id"`
   435  	Error     string `json:"error"` // error returned by the Publish method
   436  }
   437  
   438  func logNotificationError(reqID string, err error) {
   439  	loggerOutput.Lock()
   440  	defer loggerOutput.Unlock()
   441  
   442  	//nolint:errcheck
   443  	json.NewEncoder(loggerOutput.rw).Encode(NotificationErrorLogMessageObject{
   444  		Severity:  sevError,
   445  		Message:   LogNotificationErrorMessage,
   446  		RequestID: reqID,
   447  		Error:     err.Error(),
   448  	})
   449  }
   450  
   451  const StorageInfoLogMessage = "Storage Info"
   452  
   453  type StorageInfoLogMessageObject struct {
   454  	Severity string `json:"severity"` // "info", "error"
   455  	Message  string `json:"message"`  // The log message
   456  
   457  	// RequestID is logged to tie StorageInfo logs to their requests.
   458  	RequestID string `json:"request_pubsub_id"`
   459  
   460  	StorageInfo StorageInfo `json:"storage_info,omitempty"`
   461  }
   462  
   463  func logStorageInfo(reqID string, si StorageInfo) {
   464  	var sev = sevInfo
   465  	var msg = StorageInfoLogMessage
   466  
   467  	if len(si.ObjectsDoNotExist) > 0 {
   468  		sev = sevWarning
   469  	}
   470  	if len(si.Errors) > 0 {
   471  		sev = sevError
   472  	} else {
   473  		// No errors, so we **minimize data logged**
   474  		for i := range si.ObjectsPut {
   475  			si.ObjectsPut[i].Content = ""
   476  		}
   477  		for i := range si.ObjectsDeleted {
   478  			si.ObjectsDeleted[i].Content = ""
   479  		}
   480  	}
   481  
   482  	loggerOutput.Lock()
   483  	defer loggerOutput.Unlock()
   484  	//nolint:errcheck
   485  	json.NewEncoder(loggerOutput.rw).Encode(StorageInfoLogMessageObject{
   486  		Severity:    sev,
   487  		Message:     msg,
   488  		RequestID:   reqID,
   489  		StorageInfo: si,
   490  	})
   491  }
   492  
   493  func IsGoogleAPIError(err error) bool {
   494  	var gerr *googleapi.Error
   495  	return errors.As(err, &gerr)
   496  }
   497  
   498  const LogGoogleAPIErrMessage = "Googleapi Error"
   499  
   500  type LogGoogleAPIErrMessageObject struct {
   501  	Severity string `json:"severity"` // error
   502  	Message  string `json:"message"`
   503  
   504  	RequestID string `json:"pubsub_request_id"`
   505  
   506  	Error string `json:"error"`
   507  
   508  	Code int `json:"response_status_code"`
   509  
   510  	Header http.Header `json:"response_header"`
   511  
   512  	Body string `json:"response_body"`
   513  
   514  	Errors []googleapi.ErrorItem `json:"errors"`
   515  }
   516  
   517  func logGoogleAPIErr(ipsm IPubSubMessage, err error) {
   518  	var gerr *googleapi.Error
   519  	if ok := errors.As(err, &gerr); !ok {
   520  		return
   521  	}
   522  
   523  	loggerOutput.Lock()
   524  	defer loggerOutput.Unlock()
   525  
   526  	var l = LogGoogleAPIErrMessageObject{
   527  		Severity:  sevError,
   528  		Message:   LogGoogleAPIErrMessage,
   529  		RequestID: ipsm.ID(),
   530  		Error:     err.Error(),
   531  		Code:      gerr.Code,
   532  		Header:    gerr.Header, // TODO for security, remove header from logs after confirming/denying `Retry-After` is present,
   533  		Body:      gerr.Body,
   534  		Errors:    gerr.Errors,
   535  	}
   536  
   537  	//nolint:errcheck
   538  	json.NewEncoder(loggerOutput.rw).Encode(l)
   539  }
   540  

View as plain text