package chariot import ( "context" "encoding/json" "errors" "fmt" "io" "net/http" "os" "sync" "time" "cloud.google.com/go/pubsub" "cloud.google.com/go/storage" "google.golang.org/api/googleapi" helmApi "github.com/fluxcd/helm-controller/api/v2beta1" kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1beta2" "edge-infra.dev/pkg/edge/constants" eamodel "edge-infra.dev/pkg/edge/edgeagent/model" "edge-infra.dev/pkg/lib/mqtt" ) const ( sevError = "error" sevWarning = "warning" sevInfo = "info" ) // loggerOutput is a io.ReadWriter protected by a mutex. It prevents garbled log output. var loggerOutput = struct { sync.Mutex // called before rw is written to. rw io.ReadWriter }{ rw: os.Stdout, } // Option functions set private fields in the Daemon. type Option func(*Daemon) error // OptionGoogleCloudStorage sets the Google Cloud Storage client in the Daemon. func OptionGoogleCloudStorage(client *storage.Client) Option { return func(d *Daemon) error { if client == nil { return fmt.Errorf("Google Cloud Storage client must not be nil") } d.storer = NewGoogleCloudStorage(client) return nil } } // OptionPubSubResponsePublisher sets the Publisher that handles chariot responses. func OptionPubSubResponsePublisher(p Publisher) Option { return func(d *Daemon) error { if p == nil { return fmt.Errorf("Publisher must not be nil") } d.response = p return nil } } // OptionPubSubReceiver sets the PubSub interface in the Daemon. func OptionPubSubReceiver(ipsr IPubSubReceiver) Option { return func(d *Daemon) error { if ipsr == nil { return fmt.Errorf("IPubSubReceiver must not be nil") } d.pubSubReceiver = ipsr return nil } } // Daemon is the Chariot process that pulls and processes PubSub messages. type Daemon struct { sync.Mutex // this mutex protects the pubSubReceiverHandle method. pubSubReceiver IPubSubReceiver storer *GoogleCloudStorage response Publisher metrics *Metrics } // NewDaemon returns a Daemon with the provided options set. func NewDaemon(options ...Option) (*Daemon, error) { var d = new(Daemon) for _, opt := range options { if err := opt(d); err != nil { return nil, fmt.Errorf("Error setting option: %w", err) } } d.metrics = NewMetrics() return d, nil } // Run starts the daemon and blocks until an error occurs. func (d *Daemon) Run(ctx context.Context) error { if d.response == nil { d.response = NopPublisher{} } err := d.pubSubReceiver.Receive(ctx, d.pubSubReceiverHandle) return fmt.Errorf("Error returned by Receive function: %w", err) } // pubSubReceiverHandle is called for each PubSub message. func (d *Daemon) pubSubReceiverHandle(ctx context.Context, ipsm IPubSubMessage) { go d.handleRequest(ctx, ipsm) } // performStorageOperation panics if the request's operation is invalid. func (d *Daemon) performStorageOperation(ctx context.Context, req Request) (si StorageInfo, err error) { var objects []StorageObject if objects, err = req.StorageObjects(); err != nil { return si, fmt.Errorf("Error generating storage objects: %w", err) } switch req.Operation { case OperationCreate: return d.storer.Put(ctx, objects...) case OperationDelete: return d.storer.Delete(ctx, objects...) default: const panicmsg = "Requests should have been validated before performStorageOperation is called. Got unknown operation: %q" panic(fmt.Sprintf(panicmsg, req.Operation)) } } // publishEventNotification takes a request and sends a notification to the // banner topic for edge-agent to notify stores that configuration has changed func (d *Daemon) publishEventNotification(ctx context.Context, req Request, objects ...StorageObject) error { switch req.Operation { case OperationCreate: return d.notify(ctx, req, objects...) case OperationDelete: //currently we do not consider delete events default: return nil } return nil } func (d *Daemon) notify(ctx context.Context, req Request, objects ...StorageObject) error { //create notification pubsub message to send to edge agent notification := eamodel.NewNotificationMessage(). SetActor(req.Owner). SetClusterEdgeID(req.Cluster) // boolean to indicate if kustomization needs to sync, only one sync message needs to be sent for n objects syncKustomization := false for _, obj := range objects { var event *eamodel.Event //StorageObject.Content is the string representation of an object, //converting it back to bytes to read object metadata obj, err := ParseYamlGVKNN([]byte(obj.Content)) if err != nil { return fmt.Errorf("error parsing storage object for notifier: %w", err) } switch obj.Kind { case helmApi.HelmReleaseKind: event = eamodel.BuildEdgeAgentEvent(obj.Metadata.Name, helmApi.HelmReleaseKind, constants.FluxSystem, eamodel.EventTypeReconcile) // for all other synced object we want to reconile the chariot-sync kustomization default: if syncKustomization { continue } event = eamodel.BuildEdgeAgentEvent(eamodel.ChariotSync, kustomizeApi.KustomizationKind, constants.FluxEdgeNamespace, eamodel.EventTypeReconcile) //set value to true to not create any more kustomization sync events syncKustomization = true } if event != nil { notification.AddEvent(*event) } } if len(notification.Events) > 0 { // create pubsub client for the banner project the notification message will be published to psNotifierClient, err := pubsub.NewClient(ctx, req.Banner) if err != nil { return fmt.Errorf("error creating pubsub client for notifications: %w", err) } // defer closing pubsub client once message has been published defer psNotifierClient.Close() ipsnotifier, err := mqtt.NewGooglePubSubTopicWrapper(ctx, psNotifierClient, req.Banner, eamodel.EdgeAgentTopicAndOwner) if err != nil { return fmt.Errorf("error fetching pubsub topic for notifications: %w", err) } // defer cleanup of topic goroutines and resources once message has been published defer ipsnotifier.Stop() // publish notification return ipsnotifier.Publish(ctx, notification) } return nil } // handleRequest takes a pubsub message and performs the operation. func (d *Daemon) handleRequest(ctx context.Context, ipsm IPubSubMessage) { var ackdeadline, cancel = context.WithTimeout(ctx, AckDeadline) defer cancel() // Create chariot pubsub response that satisfies the IPubSubMessage interface. var resp = &chariotResponseMessage{ requestID: ipsm.ID(), } // Log metrics for every request. var req Request defer func() { var successful = resp.err == nil d.metrics.IncRequestsTotal(req, successful) // record metrics about errors. if !successful { d.metrics.IncErrorsTotal(req) if errors.Is(resp.err, context.DeadlineExceeded) { d.metrics.IncDeadlineExceededTotal(AckDeadline) } var gerr *googleapi.Error if errors.As(resp.err, &gerr) { d.metrics.IncGoogleAPIErrorsTotal(resp.err) } } }() // Decode the json Chariot request if err := json.Unmarshal(ipsm.Data(), &req); err != nil { // This error can't be retried. resp.err = fmt.Errorf("Error decoding request JSON: %w", err) logRequestJSONError(ipsm, resp.err) // Send an error response. if err = d.response.Publish(ackdeadline, resp); err != nil { logResponseError(ipsm.ID(), err) } ipsm.Ack() // This error can't be retried. return } // Set the owner and operation fields in the response. // The owner field is used to identify pubsub response recipients. resp.owner = req.Owner resp.operation = req.Operation if err := req.Validate(); err != nil { resp.err = fmt.Errorf("Request validation error: %w", err) logRequestJSONError(ipsm, resp.err) // Send an error response if err = d.response.Publish(ctx, resp); err != nil { logResponseError(ipsm.ID(), err) } ipsm.Ack() // This error can't be retried. return } // Perform the storage operation var si StorageInfo si, resp.err = d.performStorageOperation(ackdeadline, req) // Log & record metrics every time performStorageOperation is called. Do it before checking errors. d.metrics.IncStorageOperationsTotal(req, si) logStorageInfo(ipsm.ID(), si) if resp.err != nil { var reason = fmt.Sprintf("Storage operation failed: %v", resp.err) logRetryEvent(ipsm, reason) // Log more info about the error when able. // The error can be both a google api error and a deadline error thanks to the errorsnode package. if errors.Is(resp.err, context.DeadlineExceeded) { logAckDeadlineExpired(ipsm, "Ack deadline reached while performing storage operation") } if IsGoogleAPIError(resp.err) { logGoogleAPIErr(ipsm, resp.err) } // Nack without publishing a response since the request will be retried. ipsm.Nack() return } // If we reach here, we can publish the response if err := d.response.Publish(ctx, resp); err != nil { logResponseError(ipsm.ID(), err) } ipsm.Ack() if req.Notify { //publish to edge agent topic for every successful storage put opteration if err := d.publishEventNotification(ackdeadline, req, si.ObjectsPut...); err != nil { logNotificationError(resp.requestID, err) } } } const LogAckDeadlineExpiredMessage = "Ack deadline expired" type LogAckDeadlineExpiredMessageObject struct { Message string `json:"message"` Severity string `json:"severity"` RequestID string `json:"pubsub_request_id"` TimeoutDuration string `json:"timeout_duration"` Why string `json:"why,omitempty"` } func logAckDeadlineExpired(ipsm IPubSubMessage, why string) { var l = LogAckDeadlineExpiredMessageObject{ Message: LogAckDeadlineExpiredMessage, Severity: sevError, RequestID: ipsm.ID(), TimeoutDuration: fmt.Sprintf("%v", AckDeadline), Why: why, } loggerOutput.Lock() defer loggerOutput.Unlock() //nolint:errcheck json.NewEncoder(loggerOutput.rw).Encode(l) } type LogRetryEventMessageObject struct { Message string `json:"message"` Severity string `json:"severity"` RequestID string `json:"pubsub_request_id"` Reason string `json:"retry_reason"` } const LogRetryEventMessage = "Retry chariot request" func logRetryEvent(ipsm IPubSubMessage, reason string) { var l = LogRetryEventMessageObject{ Message: LogRetryEventMessage, Severity: sevInfo, RequestID: ipsm.ID(), Reason: reason, } loggerOutput.Lock() defer loggerOutput.Unlock() //nolint:errcheck json.NewEncoder(loggerOutput.rw).Encode(l) } type LogRequestJSONErrorMessageObject struct { Message string `json:"message"` Severity string `json:"severity"` Err string `json:"error"` Data []byte `json:"pubsub_data"` RequestID string `json:"pubsub_request_id"` } const LogRequestJSONErrorMessage = "Error parsing request JSON" func logRequestJSONError(ipsm IPubSubMessage, err error) { var lrjemo = LogRequestJSONErrorMessageObject{ Message: LogRequestJSONErrorMessage, Severity: sevError, Err: err.Error(), Data: ipsm.Data(), RequestID: ipsm.ID(), } loggerOutput.Lock() defer loggerOutput.Unlock() //nolint:errcheck json.NewEncoder(loggerOutput.rw).Encode(lrjemo) } const SuccessLogMessage = "Successfully processed pubsub message" const FailureLogMessage = "Failed to process pubsub message" // PubSubLogMessageObject is logged when an Ack occurs. type PubSubLogMessageObject struct { Severity string `json:"severity"` // "info", "error" Message string `json:"message"` // The log message // Err is an error that caused Chariot to not process the reqest. Err string `json:"error,omitempty"` // PubSub info ID string `json:"pubsub_id"` Data []byte `json:"pubsub_data,omitempty"` Attributes map[string]string `json:"pubsub_attributes,omitempty"` PublishTime time.Time `json:"pubsub_publish_time,omitempty"` DeliveryAttempt int `json:"pubsub_delivery_attempt,omitempty"` OrderingKey string `json:"pubsub_ordering_key,omitempty"` } const ResponseErrorLogMessage = "Error publishing pubsub response" type ResponseErrorLogMessageObject struct { Severity string `json:"severity"` // "error" Message string `json:"message"` // set to ResponseErrorLogMessage // RequestID is logged to tie StorageInfo logs to their requests. RequestID string `json:"request_pubsub_id"` Error string `json:"error"` // The error returned by the Publish method. } func logResponseError(reqID string, err error) { loggerOutput.Lock() defer loggerOutput.Unlock() //nolint:errcheck json.NewEncoder(loggerOutput.rw).Encode(ResponseErrorLogMessageObject{ Severity: sevError, Message: ResponseErrorLogMessage, RequestID: reqID, Error: err.Error(), }) } const LogNotificationErrorMessage = "Error sending notification message" type NotificationErrorLogMessageObject struct { Severity string `json:"severity"` Message string `json:"message"` // set to LogNotificationErrorMessage // RequestID is logged to tie Notification logs to their requests. RequestID string `json:"request_pubsub_id"` Error string `json:"error"` // error returned by the Publish method } func logNotificationError(reqID string, err error) { loggerOutput.Lock() defer loggerOutput.Unlock() //nolint:errcheck json.NewEncoder(loggerOutput.rw).Encode(NotificationErrorLogMessageObject{ Severity: sevError, Message: LogNotificationErrorMessage, RequestID: reqID, Error: err.Error(), }) } const StorageInfoLogMessage = "Storage Info" type StorageInfoLogMessageObject struct { Severity string `json:"severity"` // "info", "error" Message string `json:"message"` // The log message // RequestID is logged to tie StorageInfo logs to their requests. RequestID string `json:"request_pubsub_id"` StorageInfo StorageInfo `json:"storage_info,omitempty"` } func logStorageInfo(reqID string, si StorageInfo) { var sev = sevInfo var msg = StorageInfoLogMessage if len(si.ObjectsDoNotExist) > 0 { sev = sevWarning } if len(si.Errors) > 0 { sev = sevError } else { // No errors, so we **minimize data logged** for i := range si.ObjectsPut { si.ObjectsPut[i].Content = "" } for i := range si.ObjectsDeleted { si.ObjectsDeleted[i].Content = "" } } loggerOutput.Lock() defer loggerOutput.Unlock() //nolint:errcheck json.NewEncoder(loggerOutput.rw).Encode(StorageInfoLogMessageObject{ Severity: sev, Message: msg, RequestID: reqID, StorageInfo: si, }) } func IsGoogleAPIError(err error) bool { var gerr *googleapi.Error return errors.As(err, &gerr) } const LogGoogleAPIErrMessage = "Googleapi Error" type LogGoogleAPIErrMessageObject struct { Severity string `json:"severity"` // error Message string `json:"message"` RequestID string `json:"pubsub_request_id"` Error string `json:"error"` Code int `json:"response_status_code"` Header http.Header `json:"response_header"` Body string `json:"response_body"` Errors []googleapi.ErrorItem `json:"errors"` } func logGoogleAPIErr(ipsm IPubSubMessage, err error) { var gerr *googleapi.Error if ok := errors.As(err, &gerr); !ok { return } loggerOutput.Lock() defer loggerOutput.Unlock() var l = LogGoogleAPIErrMessageObject{ Severity: sevError, Message: LogGoogleAPIErrMessage, RequestID: ipsm.ID(), Error: err.Error(), Code: gerr.Code, Header: gerr.Header, // TODO for security, remove header from logs after confirming/denying `Retry-After` is present, Body: gerr.Body, Errors: gerr.Errors, } //nolint:errcheck json.NewEncoder(loggerOutput.rw).Encode(l) }