...
1 package grpc
2
3 import (
4 "io"
5
6 "github.com/go-logr/logr"
7
8 "edge-infra.dev/pkg/lib/fog"
9
10 "github.com/google/uuid"
11 "golang.org/x/net/context"
12
13 "edge-infra.dev/pkg/edge/datasync/chirp/model"
14 persister "edge-infra.dev/pkg/edge/datasync/chirp/persister"
15
16 "edge-infra.dev/pkg/edge/datasync/internal/config"
17 protos "edge-infra.dev/pkg/edge/datasync/internal/protos/upload"
18 )
19
20 type MessagingService struct {
21 messagePersister persister.MessagePersister
22 logger logr.Logger
23 config *config.Config
24 metrics *Metrics
25 }
26
27
28 func NewMessagingService(msgPersister persister.MessagePersister, cfg *config.Config) *MessagingService {
29 return &MessagingService{messagePersister: msgPersister, logger: fog.New(), config: cfg, metrics: NewMetrics()}
30 }
31
32
33 func (s *MessagingService) SendMessage(_ context.Context, request *protos.Message) (*protos.SendMessageResult, error) {
34 s.logger.Info("Received SendMessage request from client")
35
36
37
38
39
40
41 messageID, _ := uuid.Parse(request.ID)
42 s.metrics.DatasyncMessagingGRPCMetrics.Inc()
43 messageData := model.MessageData{ID: messageID, Type: request.Type, Payload: request.Payload}
44 err := s.messagePersister.Save(messageData)
45 if err != nil {
46 s.logger.Error(err, "failed to save message")
47
48
49
50
51
52
53 }
54
55 return &protos.SendMessageResult{MessageID: request.ID, IsError: err != nil}, nil
56 }
57
58 func (s *MessagingService) SendMessagesBatch(_ context.Context, request *protos.SendMessagesBatchRequest) (*protos.SendMessagesBatchResponse, error) {
59 var messageID uuid.UUID
60 var sendMessageResults []*protos.SendMessageResult
61
62 var currentSendMessageResult *protos.SendMessageResult
63 haveErrors := false
64 failuresCounter := 0
65
66 batchSize := len(request.Messages)
67 s.metrics.DatasyncMessagingGRPCMetrics.Add(float64(batchSize))
68 for _, message := range request.Messages {
69 messageID, _ = uuid.Parse(message.ID)
70
71 messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload}
72 err := s.messagePersister.Save(messageData)
73
74 if err != nil {
75 haveErrors = true
76 failuresCounter++
77
78 if err != nil {
79 s.logger.Error(err, "failed to save message")
80
81
82
83
84
85
86 }
87 }
88
89 currentSendMessageResult = &protos.SendMessageResult{MessageID: messageID.String(), IsError: err != nil}
90
91 sendMessageResults = append(sendMessageResults, currentSendMessageResult)
92 }
93
94 if !haveErrors {
95 s.logger.Info("finished sending batch successfully")
96
97
98
99
100
101 } else {
102 s.logger.Info("partially succeeded to send batch")
103
104
105
106
107
108
109 }
110
111 return &protos.SendMessagesBatchResponse{BatchID: request.BatchID, Results: sendMessageResults, HaveErrors: haveErrors}, nil
112 }
113
114
115 func (s *MessagingService) StreamMessage(stream protos.MessagingService_StreamMessageServer) error {
116 for s.config.RunMessagingService {
117 message, err := stream.Recv()
118
119 if err == io.EOF {
120 s.logger.Info("client has closed connection")
121
122
123
124
125 return nil
126 }
127
128 if err != nil {
129 s.logger.Error(err, "unable to read from client")
130
131
132
133
134
135 return err
136 }
137
138
139 messageID, _ := uuid.Parse(message.ID)
140 messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload}
141 saveErr := s.messagePersister.Save(messageData)
142 if saveErr != nil {
143 s.logger.Error(err, "failed to save message")
144
145
146
147
148
149
150 }
151
152 result := &protos.SendMessageResult{MessageID: message.ID, IsError: saveErr != nil}
153 s.logger.Info("server replied to message")
154
155
156
157
158
159 err = stream.Send(result)
160
161 if err != nil {
162 return err
163 }
164 }
165
166 return nil
167 }
168
View as plain text