1 package http
2
3 import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "net/http"
8
9 "edge-infra.dev/pkg/edge/datasync/internal/config"
10 "edge-infra.dev/pkg/lib/fog"
11
12 "github.com/google/uuid"
13
14 "edge-infra.dev/pkg/edge/datasync/chirp/model"
15 "edge-infra.dev/pkg/edge/datasync/chirp/persister"
16 "edge-infra.dev/pkg/edge/datasync/chirp/persister/files"
17 dsHTTP "edge-infra.dev/pkg/edge/datasync/http"
18 protos "edge-infra.dev/pkg/edge/datasync/internal/protos/upload"
19 "edge-infra.dev/pkg/edge/datasync/internal/serializer"
20 )
21
22 var _filePersister persister.MessagePersister
23 var httpMetrics *Metrics
24
25 func NewMsgServer(shouldRun chan bool, cfg *config.Config) (*dsHTTP.Server, persister.MessagePersister) {
26 _filePersister, _ = files.NewFilePersister(cfg)
27
28 serverMux := http.NewServeMux()
29
30 serverMux.HandleFunc("/send-message", SendMessage)
31 serverMux.HandleFunc("/send-messages", SendMessages)
32 httpMetrics = NewMetrics()
33 return dsHTTP.NewServer(cfg.MsgPort, serverMux, shouldRun), _filePersister
34 }
35
36 func SendMessage(responseWriter http.ResponseWriter, request *http.Request) {
37 decoder := json.NewDecoder(request.Body)
38 decoder.DisallowUnknownFields()
39
40 logger := fog.New()
41 httpMetrics.DatasyncMessagingHTTPMetrics.Inc()
42 var message protos.Message
43 err := decoder.Decode(&message)
44 if err != nil {
45 logger.Error(err, "failed to decode request")
46 result := &protos.SendMessageResult{
47 MessageID: "",
48 IsError: true,
49 ErrorDescription: err.Error(),
50 ErrorCode: "InvalidRequest",
51 }
52 sendMessageResult, _ := serializer.ProtobufToJSON(result)
53 fmt.Fprint(responseWriter, sendMessageResult)
54 return
55 }
56
57 err = persistMessage(&message)
58
59 if err != nil {
60 result := &protos.SendMessageResult{
61 MessageID: message.ID,
62 IsError: err != nil,
63 ErrorDescription: err.Error(),
64 ErrorCode: "FailedToSaveMessage",
65 }
66
67 sendMessageResult, _ := serializer.ProtobufToJSON(result)
68 fmt.Fprint(responseWriter, sendMessageResult)
69 return
70 }
71
72 sendMessageResult := &protos.SendMessageResult{
73 MessageID: message.ID,
74 IsError: err != nil,
75 }
76
77 sendMessageResultJSON, _ := serializer.ProtobufToJSON(sendMessageResult)
78
79 fmt.Fprint(responseWriter, sendMessageResultJSON)
80 }
81
82 func SendMessages(responseWriter http.ResponseWriter, request *http.Request) {
83 decoder := json.NewDecoder(request.Body)
84 decoder.DisallowUnknownFields()
85
86 logger := fog.New()
87
88 var batchRequest protos.SendMessagesBatchRequest
89 err := decoder.Decode(&batchRequest)
90 if err != nil {
91 logger.Error(err, "failed to decode batch request")
92 result := &protos.SendMessageResult{
93 MessageID: "",
94 IsError: true,
95 ErrorDescription: err.Error(),
96 ErrorCode: "InvalidBatchRequest",
97 }
98 batchResult := &protos.SendMessagesBatchResponse{
99 BatchID: "",
100 HaveErrors: true,
101 Results: []*protos.SendMessageResult{result},
102 }
103
104 sendMessageResult, _ := serializer.ProtobufToJSON(batchResult)
105 fmt.Fprint(responseWriter, sendMessageResult)
106 return
107 }
108
109 haveErrors := false
110 failuresCounter := 0
111 var sendMessageResults []*protos.SendMessageResult
112 var currentResult *protos.SendMessageResult
113
114 batchSize := len(batchRequest.Messages)
115 httpMetrics.DatasyncMessagingHTTPMetrics.Add(float64(batchSize))
116 for _, message := range batchRequest.Messages {
117 currentResult = &protos.SendMessageResult{MessageID: message.ID, IsError: false}
118
119 err := persistMessage(message)
120
121 if err != nil {
122 haveErrors = true
123 failuresCounter++
124 currentResult = &protos.SendMessageResult{MessageID: message.ID, IsError: true, ErrorCode: "FailedToSaveMessage", ErrorDescription: err.Error()}
125 }
126
127 sendMessageResults = append(sendMessageResults, currentResult)
128 }
129
130 if !haveErrors {
131 logger.Info("finished sending batch successfully")
132 } else {
133 logger.Info("partially succeeded to send batch")
134 }
135
136 sendMessageBatchResponse := &protos.SendMessagesBatchResponse{
137 BatchID: batchRequest.BatchID,
138 HaveErrors: haveErrors,
139 Results: sendMessageResults,
140 }
141
142 sendMessageBatchResponseJSON, _ := serializer.ProtobufToJSON(sendMessageBatchResponse)
143
144 fmt.Fprint(responseWriter, sendMessageBatchResponseJSON)
145 }
146
147 func validateMessage(message *protos.Message) error {
148
149 _, err := uuid.Parse(message.ID)
150
151 if err != nil {
152 errorMessage := fmt.Sprintf("invalid message id '%v'\n", message.ID)
153 return errors.New(errorMessage)
154 }
155
156
157 if len(message.Payload) == 0 {
158 errorMessage := fmt.Sprintf("message '%v' doesn't have payload\n", message.ID)
159 return errors.New(errorMessage)
160 }
161
162 return nil
163 }
164
165 func persistMessage(message *protos.Message) error {
166 err := validateMessage(message)
167 if err != nil {
168 return err
169 }
170
171 logger := fog.New()
172
173 messageID, _ := uuid.Parse(message.ID)
174 messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload}
175
176 err = _filePersister.Save(messageData)
177
178 if err != nil {
179 logger.Error(err, "failed to save message for sending")
180 errorMessage := fmt.Sprintf("failed to save message id '%v' for sending", message.ID)
181 return errors.New(errorMessage)
182 }
183
184 return nil
185 }
186
View as plain text