...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/sender/kafka-sender.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/sender

     1  package sender
     2  
     3  import (
     4  	"context"
     5  
     6  	"edge-infra.dev/pkg/edge/datasync/chirp/model"
     7  	"edge-infra.dev/pkg/edge/datasync/chirp/persister"
     8  	"edge-infra.dev/pkg/edge/datasync/internal/collections"
     9  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    10  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
    11  	"edge-infra.dev/pkg/lib/fog"
    12  
    13  	"github.com/go-logr/logr"
    14  )
    15  
    16  type kafkaSender struct {
    17  	producer            kafkaclient.Producer
    18  	topicPerMessageType map[string]string
    19  	routes              []string
    20  	messagePersister    persister.MessagePersister
    21  	metrics             *metrics
    22  	logger              logr.Logger
    23  }
    24  
    25  func NewKafkaSender(p kafkaclient.Producer, msgPersister persister.MessagePersister, chirpConfig *config.Config) MessageSender {
    26  	return &kafkaSender{
    27  		producer:            p,
    28  		topicPerMessageType: config.GetTopics(chirpConfig),
    29  		routes:              config.GetRoutes(chirpConfig),
    30  		messagePersister:    msgPersister,
    31  		metrics:             newMetrics(),
    32  		logger:              fog.New(),
    33  	}
    34  }
    35  
    36  func (s *kafkaSender) Send(messages map[string]model.Message) []model.Message {
    37  	var sentMessages = make([]model.Message, 0)
    38  
    39  	for _, message := range messages {
    40  		isSent := s.sendMessage(message)
    41  
    42  		if isSent {
    43  			sentMessages = append(sentMessages, message)
    44  		}
    45  	}
    46  
    47  	if len(sentMessages) != 0 {
    48  		if len(messages) != len(sentMessages) {
    49  			s.logger.Info("partially succeeded to send messages")
    50  		}
    51  		s.metrics.SentMessagesTotal.WithLabelValues(
    52  			sentMessages[0].Type, sentMessages[0].TenantID).Add(float64(len(sentMessages)))
    53  		_ = s.messagePersister.Delete(sentMessages)
    54  	} else {
    55  		s.logger.Info("no messages were sent")
    56  	}
    57  
    58  	return sentMessages
    59  }
    60  
    61  func (s *kafkaSender) sendMessage(message model.Message) bool {
    62  	topic := s.getTargetTopicName(message.Type)
    63  	record := &kafkaclient.Record{
    64  		Topic: topic,
    65  		Value: message.Payload,
    66  		Headers: []kafkaclient.Header{
    67  			{Key: "id", Value: []byte(message.ID.String())},
    68  			{Key: "type", Value: []byte(message.Type)},
    69  			{Key: "organization", Value: []byte(message.TenantID)},
    70  			{Key: "organization_name", Value: []byte(message.OrganizationName)},
    71  			{Key: "organization_id", Value: []byte(message.OrganizationID)},
    72  			{Key: "site_id", Value: []byte(message.SiteID)},
    73  			{Key: "site_name", Value: []byte(message.SiteName)},
    74  			{Key: "signature", Value: []byte(message.Signature)},
    75  			{Key: "created", Value: []byte(message.CreatedAt.String())},
    76  		},
    77  	}
    78  	if err := s.producer.Produce(context.Background(), record); err != nil {
    79  		s.logger.Error(err, "failed to enqueue message to kafka")
    80  		return false
    81  	}
    82  	return true
    83  }
    84  
    85  func (s *kafkaSender) getTargetTopicName(messageType string) string {
    86  	messageTypeWithFallback := messageType
    87  	if !collections.Include(s.routes, messageType) {
    88  		messageTypeWithFallback = "public"
    89  	}
    90  
    91  	return s.topicPerMessageType[messageTypeWithFallback]
    92  }
    93  

View as plain text