package http import ( "encoding/json" "errors" "fmt" "net/http" "edge-infra.dev/pkg/edge/datasync/internal/config" "edge-infra.dev/pkg/lib/fog" "github.com/google/uuid" "edge-infra.dev/pkg/edge/datasync/chirp/model" "edge-infra.dev/pkg/edge/datasync/chirp/persister" "edge-infra.dev/pkg/edge/datasync/chirp/persister/files" dsHTTP "edge-infra.dev/pkg/edge/datasync/http" protos "edge-infra.dev/pkg/edge/datasync/internal/protos/upload" "edge-infra.dev/pkg/edge/datasync/internal/serializer" ) var _filePersister persister.MessagePersister var httpMetrics *Metrics func NewMsgServer(shouldRun chan bool, cfg *config.Config) (*dsHTTP.Server, persister.MessagePersister) { _filePersister, _ = files.NewFilePersister(cfg) serverMux := http.NewServeMux() serverMux.HandleFunc("/send-message", SendMessage) serverMux.HandleFunc("/send-messages", SendMessages) httpMetrics = NewMetrics() return dsHTTP.NewServer(cfg.MsgPort, serverMux, shouldRun), _filePersister } func SendMessage(responseWriter http.ResponseWriter, request *http.Request) { decoder := json.NewDecoder(request.Body) decoder.DisallowUnknownFields() logger := fog.New() httpMetrics.DatasyncMessagingHTTPMetrics.Inc() var message protos.Message err := decoder.Decode(&message) if err != nil { logger.Error(err, "failed to decode request") result := &protos.SendMessageResult{ MessageID: "", IsError: true, ErrorDescription: err.Error(), ErrorCode: "InvalidRequest", } sendMessageResult, _ := serializer.ProtobufToJSON(result) fmt.Fprint(responseWriter, sendMessageResult) return } err = persistMessage(&message) if err != nil { result := &protos.SendMessageResult{ MessageID: message.ID, IsError: err != nil, ErrorDescription: err.Error(), ErrorCode: "FailedToSaveMessage", } sendMessageResult, _ := serializer.ProtobufToJSON(result) fmt.Fprint(responseWriter, sendMessageResult) return } sendMessageResult := &protos.SendMessageResult{ MessageID: message.ID, IsError: err != nil, } sendMessageResultJSON, _ := serializer.ProtobufToJSON(sendMessageResult) fmt.Fprint(responseWriter, sendMessageResultJSON) } func SendMessages(responseWriter http.ResponseWriter, request *http.Request) { decoder := json.NewDecoder(request.Body) decoder.DisallowUnknownFields() logger := fog.New() var batchRequest protos.SendMessagesBatchRequest err := decoder.Decode(&batchRequest) if err != nil { logger.Error(err, "failed to decode batch request") result := &protos.SendMessageResult{ MessageID: "", IsError: true, ErrorDescription: err.Error(), ErrorCode: "InvalidBatchRequest", } batchResult := &protos.SendMessagesBatchResponse{ BatchID: "", HaveErrors: true, Results: []*protos.SendMessageResult{result}, } sendMessageResult, _ := serializer.ProtobufToJSON(batchResult) fmt.Fprint(responseWriter, sendMessageResult) return } haveErrors := false failuresCounter := 0 var sendMessageResults []*protos.SendMessageResult var currentResult *protos.SendMessageResult batchSize := len(batchRequest.Messages) httpMetrics.DatasyncMessagingHTTPMetrics.Add(float64(batchSize)) for _, message := range batchRequest.Messages { currentResult = &protos.SendMessageResult{MessageID: message.ID, IsError: false} err := persistMessage(message) if err != nil { haveErrors = true failuresCounter++ currentResult = &protos.SendMessageResult{MessageID: message.ID, IsError: true, ErrorCode: "FailedToSaveMessage", ErrorDescription: err.Error()} } sendMessageResults = append(sendMessageResults, currentResult) } if !haveErrors { logger.Info("finished sending batch successfully") } else { logger.Info("partially succeeded to send batch") } sendMessageBatchResponse := &protos.SendMessagesBatchResponse{ BatchID: batchRequest.BatchID, HaveErrors: haveErrors, Results: sendMessageResults, } sendMessageBatchResponseJSON, _ := serializer.ProtobufToJSON(sendMessageBatchResponse) fmt.Fprint(responseWriter, sendMessageBatchResponseJSON) } func validateMessage(message *protos.Message) error { // validate message id _, err := uuid.Parse(message.ID) if err != nil { errorMessage := fmt.Sprintf("invalid message id '%v'\n", message.ID) return errors.New(errorMessage) } // validate payload if len(message.Payload) == 0 { errorMessage := fmt.Sprintf("message '%v' doesn't have payload\n", message.ID) return errors.New(errorMessage) } return nil } func persistMessage(message *protos.Message) error { err := validateMessage(message) if err != nil { return err } logger := fog.New() messageID, _ := uuid.Parse(message.ID) messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload} err = _filePersister.Save(messageData) if err != nil { logger.Error(err, "failed to save message for sending") errorMessage := fmt.Sprintf("failed to save message id '%v' for sending", message.ID) return errors.New(errorMessage) } return nil }