...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/server/grpc/msg.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/server/grpc

     1  package grpc
     2  
     3  import (
     4  	"io"
     5  
     6  	"github.com/go-logr/logr"
     7  
     8  	"edge-infra.dev/pkg/lib/fog"
     9  
    10  	"github.com/google/uuid"
    11  	"golang.org/x/net/context"
    12  
    13  	"edge-infra.dev/pkg/edge/datasync/chirp/model"
    14  	persister "edge-infra.dev/pkg/edge/datasync/chirp/persister"
    15  
    16  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    17  	protos "edge-infra.dev/pkg/edge/datasync/internal/protos/upload"
    18  )
    19  
    20  type MessagingService struct {
    21  	messagePersister persister.MessagePersister
    22  	logger           logr.Logger
    23  	config           *config.Config
    24  	metrics          *Metrics
    25  }
    26  
    27  // New messaging service to work with
    28  func NewMessagingService(msgPersister persister.MessagePersister, cfg *config.Config) *MessagingService {
    29  	return &MessagingService{messagePersister: msgPersister, logger: fog.New(), config: cfg, metrics: NewMetrics()}
    30  }
    31  
    32  // SendMessage
    33  func (s *MessagingService) SendMessage(_ context.Context, request *protos.Message) (*protos.SendMessageResult, error) {
    34  	s.logger.Info("Received SendMessage request from client")
    35  	// logger.WithFields(map[string]interface{}{
    36  	// 	"source":       "gRPCServer.SendMessage",
    37  	// 	"message-type": request.Type,
    38  	// 	"message-id":   request.ID,
    39  	// }).Info("Received SendMessage request from client")
    40  
    41  	messageID, _ := uuid.Parse(request.ID)
    42  	s.metrics.DatasyncMessagingGRPCMetrics.Inc()
    43  	messageData := model.MessageData{ID: messageID, Type: request.Type, Payload: request.Payload}
    44  	err := s.messagePersister.Save(messageData)
    45  	if err != nil {
    46  		s.logger.Error(err, "failed to save message")
    47  		// logger.WithFields(map[string]interface{}{
    48  		// 	"source":      "file-persister.SaveMessage",
    49  		// 	"messageType": request.Type,
    50  		// 	"messageId":   messageID,
    51  		// 	"error":       err.Error(),
    52  		// }).Error("failed to save message")
    53  	}
    54  
    55  	return &protos.SendMessageResult{MessageID: request.ID, IsError: err != nil}, nil
    56  }
    57  
    58  func (s *MessagingService) SendMessagesBatch(_ context.Context, request *protos.SendMessagesBatchRequest) (*protos.SendMessagesBatchResponse, error) {
    59  	var messageID uuid.UUID
    60  	var sendMessageResults []*protos.SendMessageResult
    61  
    62  	var currentSendMessageResult *protos.SendMessageResult
    63  	haveErrors := false
    64  	failuresCounter := 0
    65  	// Total number of messages in the batch
    66  	batchSize := len(request.Messages)
    67  	s.metrics.DatasyncMessagingGRPCMetrics.Add(float64(batchSize))
    68  	for _, message := range request.Messages {
    69  		messageID, _ = uuid.Parse(message.ID)
    70  
    71  		messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload}
    72  		err := s.messagePersister.Save(messageData)
    73  
    74  		if err != nil {
    75  			haveErrors = true
    76  			failuresCounter++
    77  
    78  			if err != nil {
    79  				s.logger.Error(err, "failed to save message")
    80  				// logger.WithFields(map[string]interface{}{
    81  				// 	"source":      "file-persister.SaveMessage",
    82  				// 	"messageType": message.Type,
    83  				// 	"messageId":   messageID,
    84  				// 	"error":       err.Error(),
    85  				// }).Error("failed to save message")
    86  			}
    87  		}
    88  
    89  		currentSendMessageResult = &protos.SendMessageResult{MessageID: messageID.String(), IsError: err != nil}
    90  
    91  		sendMessageResults = append(sendMessageResults, currentSendMessageResult)
    92  	}
    93  
    94  	if !haveErrors {
    95  		s.logger.Info("finished sending batch successfully")
    96  		// logger.WithFields(map[string]interface{}{
    97  		// 	"source":   "gRPCServer.SendMessagesBatch",
    98  		// 	"sent":     len(request.Messages),
    99  		// 	"batch-id": request.BatchID,
   100  		// }).Info("finished sending batch successfully")
   101  	} else {
   102  		s.logger.Info("partially succeeded to send batch")
   103  		// logger.WithFields(map[string]interface{}{
   104  		// 	"source":   "gRPCServer.SendMessagesBatch",
   105  		// 	"sent":     len(request.Messages),
   106  		// 	"failed":   failuresCounter,
   107  		// 	"batch-id": request.BatchID,
   108  		// }).Warning("partially succeeded to send batch")
   109  	}
   110  
   111  	return &protos.SendMessagesBatchResponse{BatchID: request.BatchID, Results: sendMessageResults, HaveErrors: haveErrors}, nil
   112  }
   113  
   114  // SendMessages gets a stream of messages, and responds with a summary
   115  func (s *MessagingService) StreamMessage(stream protos.MessagingService_StreamMessageServer) error {
   116  	for s.config.RunMessagingService {
   117  		message, err := stream.Recv()
   118  
   119  		if err == io.EOF {
   120  			s.logger.Info("client has closed connection")
   121  			// logger.WithFields(map[string]interface{}{
   122  			// 	"source": "gRPCServer.StreamMessage",
   123  			// }).Info("client has closed connection")
   124  
   125  			return nil
   126  		}
   127  
   128  		if err != nil {
   129  			s.logger.Error(err, "unable to read from client")
   130  			// logger.WithFields(map[string]interface{}{
   131  			// 	"source": "gRPCServer.StreamMessage",
   132  			// 	"error":  err,
   133  			// }).Error("unable to read from client")
   134  
   135  			return err
   136  		}
   137  
   138  		// TODO: instead of returning bool, return struct with: IsError, ErrorCode, ErrorMessage
   139  		messageID, _ := uuid.Parse(message.ID)
   140  		messageData := model.MessageData{ID: messageID, Type: message.Type, Payload: message.Payload}
   141  		saveErr := s.messagePersister.Save(messageData)
   142  		if saveErr != nil {
   143  			s.logger.Error(err, "failed to save message")
   144  			// logger.WithFields(map[string]interface{}{
   145  			// 	"source":      "file-persister.SaveMessage",
   146  			// 	"messageType": message.Type,
   147  			// 	"messageId":   messageID,
   148  			// 	"error":       saveErr.Error(),
   149  			// }).Error("failed to save message")
   150  		}
   151  
   152  		result := &protos.SendMessageResult{MessageID: message.ID, IsError: saveErr != nil}
   153  		s.logger.Info("server replied to message")
   154  		// logger.WithFields(map[string]interface{}{
   155  		// 	"source":     "gRPCServer.StreamMessage",
   156  		// 	"message-id": result.MessageID,
   157  		// }).Info("server replied to message")
   158  
   159  		err = stream.Send(result)
   160  
   161  		if err != nil {
   162  			return err
   163  		}
   164  	}
   165  
   166  	return nil
   167  }
   168  

View as plain text