...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/server/http/msg.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/server/http

     1  package http
     2  
     3  import (
     4  	"encoding/json"
     5  	"errors"
     6  	"fmt"
     7  	"net/http"
     8  
     9  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    10  	"edge-infra.dev/pkg/lib/fog"
    11  
    12  	"github.com/google/uuid"
    13  
    14  	"edge-infra.dev/pkg/edge/datasync/chirp/model"
    15  	"edge-infra.dev/pkg/edge/datasync/chirp/persister"
    16  	"edge-infra.dev/pkg/edge/datasync/chirp/persister/files"
    17  	dsHTTP "edge-infra.dev/pkg/edge/datasync/http"
    18  	protos "edge-infra.dev/pkg/edge/datasync/internal/protos/upload"
    19  	"edge-infra.dev/pkg/edge/datasync/internal/serializer"
    20  )
    21  
    22  var _filePersister persister.MessagePersister
    23  var httpMetrics *Metrics
    24  
    25  func NewMsgServer(shouldRun chan bool, cfg *config.Config) (*dsHTTP.Server, persister.MessagePersister) {
    26  	_filePersister, _ = files.NewFilePersister(cfg)
    27  
    28  	serverMux := http.NewServeMux()
    29  
    30  	serverMux.HandleFunc("/send-message", SendMessage)
    31  	serverMux.HandleFunc("/send-messages", SendMessages)
    32  	httpMetrics = NewMetrics()
    33  	return dsHTTP.NewServer(cfg.MsgPort, serverMux, shouldRun), _filePersister
    34  }
    35  
    36  func SendMessage(responseWriter http.ResponseWriter, request *http.Request) {
    37  	decoder := json.NewDecoder(request.Body)
    38  	decoder.DisallowUnknownFields()
    39  
    40  	logger := fog.New()
    41  	httpMetrics.DatasyncMessagingHTTPMetrics.Inc()
    42  	var message protos.Message
    43  	err := decoder.Decode(&message)
    44  	if err != nil {
    45  		logger.Error(err, "failed to decode request")
    46  		result := &protos.SendMessageResult{
    47  			MessageID:        "",
    48  			IsError:          true,
    49  			ErrorDescription: err.Error(),
    50  			ErrorCode:        "InvalidRequest",
    51  		}
    52  		sendMessageResult, _ := serializer.ProtobufToJSON(result)
    53  		fmt.Fprint(responseWriter, sendMessageResult)
    54  		return
    55  	}
    56  
    57  	err = persistMessage(&message)
    58  
    59  	if err != nil {
    60  		result := &protos.SendMessageResult{
    61  			MessageID:        message.ID,
    62  			IsError:          err != nil,
    63  			ErrorDescription: err.Error(),
    64  			ErrorCode:        "FailedToSaveMessage",
    65  		}
    66  
    67  		sendMessageResult, _ := serializer.ProtobufToJSON(result)
    68  		fmt.Fprint(responseWriter, sendMessageResult)
    69  		return
    70  	}
    71  
    72  	sendMessageResult := &protos.SendMessageResult{
    73  		MessageID: message.ID,
    74  		IsError:   err != nil,
    75  	}
    76  
    77  	sendMessageResultJSON, _ := serializer.ProtobufToJSON(sendMessageResult)
    78  
    79  	fmt.Fprint(responseWriter, sendMessageResultJSON)
    80  }
    81  
    82  func SendMessages(responseWriter http.ResponseWriter, request *http.Request) {
    83  	decoder := json.NewDecoder(request.Body)
    84  	decoder.DisallowUnknownFields()
    85  
    86  	logger := fog.New()
    87  
    88  	var batchRequest protos.SendMessagesBatchRequest
    89  	err := decoder.Decode(&batchRequest)
    90  	if err != nil {
    91  		logger.Error(err, "failed to decode batch request")
    92  		result := &protos.SendMessageResult{
    93  			MessageID:        "",
    94  			IsError:          true,
    95  			ErrorDescription: err.Error(),
    96  			ErrorCode:        "InvalidBatchRequest",
    97  		}
    98  		batchResult := &protos.SendMessagesBatchResponse{
    99  			BatchID:    "",
   100  			HaveErrors: true,
   101  			Results:    []*protos.SendMessageResult{result},
   102  		}
   103  
   104  		sendMessageResult, _ := serializer.ProtobufToJSON(batchResult)
   105  		fmt.Fprint(responseWriter, sendMessageResult)
   106  		return
   107  	}
   108  
   109  	haveErrors := false
   110  	failuresCounter := 0
   111  	var sendMessageResults []*protos.SendMessageResult
   112  	var currentResult *protos.SendMessageResult
   113  
   114  	batchSize := len(batchRequest.Messages)
   115  	httpMetrics.DatasyncMessagingHTTPMetrics.Add(float64(batchSize))
   116  	for _, message := range batchRequest.Messages {
   117  		currentResult = &protos.SendMessageResult{MessageID: message.ID, IsError: false}
   118  
   119  		err := persistMessage(message)
   120  
   121  		if err != nil {
   122  			haveErrors = true
   123  			failuresCounter++
   124  			currentResult = &protos.SendMessageResult{MessageID: message.ID, IsError: true, ErrorCode: "FailedToSaveMessage", ErrorDescription: err.Error()}
   125  		}
   126  
   127  		sendMessageResults = append(sendMessageResults, currentResult)
   128  	}
   129  
   130  	if !haveErrors {
   131  		logger.Info("finished sending batch successfully")
   132  	} else {
   133  		logger.Info("partially succeeded to send batch")
   134  	}
   135  
   136  	sendMessageBatchResponse := &protos.SendMessagesBatchResponse{
   137  		BatchID:    batchRequest.BatchID,
   138  		HaveErrors: haveErrors,
   139  		Results:    sendMessageResults,
   140  	}
   141  
   142  	sendMessageBatchResponseJSON, _ := serializer.ProtobufToJSON(sendMessageBatchResponse)
   143  
   144  	fmt.Fprint(responseWriter, sendMessageBatchResponseJSON)
   145  }
   146  
   147  func validateMessage(message *protos.Message) error {
   148  	// validate message id
   149  	_, err := uuid.Parse(message.ID)
   150  
   151  	if err != nil {
   152  		errorMessage := fmt.Sprintf("invalid message id '%v'\n", message.ID)
   153  		return errors.New(errorMessage)
   154  	}
   155  
   156  	// validate payload
   157  	if len(message.Payload) == 0 {
   158  		errorMessage := fmt.Sprintf("message '%v' doesn't have payload\n", message.ID)
   159  		return errors.New(errorMessage)
   160  	}
   161  
   162  	return nil
   163  }
   164  
   165  func persistMessage(message *protos.Message) error {
   166  	err := validateMessage(message)
   167  	if err != nil {
   168  		return err
   169  	}
   170  
   171  	logger := fog.New()
   172  
   173  	messageID, _ := uuid.Parse(message.ID)
   174  	messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload}
   175  
   176  	err = _filePersister.Save(messageData)
   177  
   178  	if err != nil {
   179  		logger.Error(err, "failed to save message for sending")
   180  		errorMessage := fmt.Sprintf("failed to save message id '%v' for sending", message.ID)
   181  		return errors.New(errorMessage)
   182  	}
   183  
   184  	return nil
   185  }
   186  

View as plain text