package grpc import ( "io" "github.com/go-logr/logr" "edge-infra.dev/pkg/lib/fog" "github.com/google/uuid" "golang.org/x/net/context" "edge-infra.dev/pkg/edge/datasync/chirp/model" persister "edge-infra.dev/pkg/edge/datasync/chirp/persister" "edge-infra.dev/pkg/edge/datasync/internal/config" protos "edge-infra.dev/pkg/edge/datasync/internal/protos/upload" ) type MessagingService struct { messagePersister persister.MessagePersister logger logr.Logger config *config.Config metrics *Metrics } // New messaging service to work with func NewMessagingService(msgPersister persister.MessagePersister, cfg *config.Config) *MessagingService { return &MessagingService{messagePersister: msgPersister, logger: fog.New(), config: cfg, metrics: NewMetrics()} } // SendMessage func (s *MessagingService) SendMessage(_ context.Context, request *protos.Message) (*protos.SendMessageResult, error) { s.logger.Info("Received SendMessage request from client") // logger.WithFields(map[string]interface{}{ // "source": "gRPCServer.SendMessage", // "message-type": request.Type, // "message-id": request.ID, // }).Info("Received SendMessage request from client") messageID, _ := uuid.Parse(request.ID) s.metrics.DatasyncMessagingGRPCMetrics.Inc() messageData := model.MessageData{ID: messageID, Type: request.Type, Payload: request.Payload} err := s.messagePersister.Save(messageData) if err != nil { s.logger.Error(err, "failed to save message") // logger.WithFields(map[string]interface{}{ // "source": "file-persister.SaveMessage", // "messageType": request.Type, // "messageId": messageID, // "error": err.Error(), // }).Error("failed to save message") } return &protos.SendMessageResult{MessageID: request.ID, IsError: err != nil}, nil } func (s *MessagingService) SendMessagesBatch(_ context.Context, request *protos.SendMessagesBatchRequest) (*protos.SendMessagesBatchResponse, error) { var messageID uuid.UUID var sendMessageResults []*protos.SendMessageResult var currentSendMessageResult *protos.SendMessageResult haveErrors := false failuresCounter := 0 // Total number of messages in the batch batchSize := len(request.Messages) s.metrics.DatasyncMessagingGRPCMetrics.Add(float64(batchSize)) for _, message := range request.Messages { messageID, _ = uuid.Parse(message.ID) messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload} err := s.messagePersister.Save(messageData) if err != nil { haveErrors = true failuresCounter++ if err != nil { s.logger.Error(err, "failed to save message") // logger.WithFields(map[string]interface{}{ // "source": "file-persister.SaveMessage", // "messageType": message.Type, // "messageId": messageID, // "error": err.Error(), // }).Error("failed to save message") } } currentSendMessageResult = &protos.SendMessageResult{MessageID: messageID.String(), IsError: err != nil} sendMessageResults = append(sendMessageResults, currentSendMessageResult) } if !haveErrors { s.logger.Info("finished sending batch successfully") // logger.WithFields(map[string]interface{}{ // "source": "gRPCServer.SendMessagesBatch", // "sent": len(request.Messages), // "batch-id": request.BatchID, // }).Info("finished sending batch successfully") } else { s.logger.Info("partially succeeded to send batch") // logger.WithFields(map[string]interface{}{ // "source": "gRPCServer.SendMessagesBatch", // "sent": len(request.Messages), // "failed": failuresCounter, // "batch-id": request.BatchID, // }).Warning("partially succeeded to send batch") } return &protos.SendMessagesBatchResponse{BatchID: request.BatchID, Results: sendMessageResults, HaveErrors: haveErrors}, nil } // SendMessages gets a stream of messages, and responds with a summary func (s *MessagingService) StreamMessage(stream protos.MessagingService_StreamMessageServer) error { for s.config.RunMessagingService { message, err := stream.Recv() if err == io.EOF { s.logger.Info("client has closed connection") // logger.WithFields(map[string]interface{}{ // "source": "gRPCServer.StreamMessage", // }).Info("client has closed connection") return nil } if err != nil { s.logger.Error(err, "unable to read from client") // logger.WithFields(map[string]interface{}{ // "source": "gRPCServer.StreamMessage", // "error": err, // }).Error("unable to read from client") return err } // TODO: instead of returning bool, return struct with: IsError, ErrorCode, ErrorMessage messageID, _ := uuid.Parse(message.ID) messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload} saveErr := s.messagePersister.Save(messageData) if saveErr != nil { s.logger.Error(err, "failed to save message") // logger.WithFields(map[string]interface{}{ // "source": "file-persister.SaveMessage", // "messageType": message.Type, // "messageId": messageID, // "error": saveErr.Error(), // }).Error("failed to save message") } result := &protos.SendMessageResult{MessageID: message.ID, IsError: saveErr != nil} s.logger.Info("server replied to message") // logger.WithFields(map[string]interface{}{ // "source": "gRPCServer.StreamMessage", // "message-id": result.MessageID, // }).Info("server replied to message") err = stream.Send(result) if err != nil { return err } } return nil }