package sender import ( "context" "edge-infra.dev/pkg/edge/datasync/chirp/model" "edge-infra.dev/pkg/edge/datasync/chirp/persister" "edge-infra.dev/pkg/edge/datasync/internal/collections" "edge-infra.dev/pkg/edge/datasync/internal/config" "edge-infra.dev/pkg/edge/datasync/kafkaclient" "edge-infra.dev/pkg/lib/fog" "github.com/go-logr/logr" ) type kafkaSender struct { producer kafkaclient.Producer topicPerMessageType map[string]string routes []string messagePersister persister.MessagePersister metrics *metrics logger logr.Logger } func NewKafkaSender(p kafkaclient.Producer, msgPersister persister.MessagePersister, chirpConfig *config.Config) MessageSender { return &kafkaSender{ producer: p, topicPerMessageType: config.GetTopics(chirpConfig), routes: config.GetRoutes(chirpConfig), messagePersister: msgPersister, metrics: newMetrics(), logger: fog.New(), } } func (s *kafkaSender) Send(messages map[string]model.Message) []model.Message { var sentMessages = make([]model.Message, 0) for _, message := range messages { isSent := s.sendMessage(message) if isSent { sentMessages = append(sentMessages, message) } } if len(sentMessages) != 0 { if len(messages) != len(sentMessages) { s.logger.Info("partially succeeded to send messages") } s.metrics.SentMessagesTotal.WithLabelValues( sentMessages[0].Type, sentMessages[0].TenantID).Add(float64(len(sentMessages))) _ = s.messagePersister.Delete(sentMessages) } else { s.logger.Info("no messages were sent") } return sentMessages } func (s *kafkaSender) sendMessage(message model.Message) bool { topic := s.getTargetTopicName(message.Type) record := &kafkaclient.Record{ Topic: topic, Value: message.Payload, Headers: []kafkaclient.Header{ {Key: "id", Value: []byte(message.ID.String())}, {Key: "type", Value: []byte(message.Type)}, {Key: "organization", Value: []byte(message.TenantID)}, {Key: "organization_name", Value: []byte(message.OrganizationName)}, {Key: "organization_id", Value: []byte(message.OrganizationID)}, {Key: "site_id", Value: []byte(message.SiteID)}, {Key: "site_name", Value: []byte(message.SiteName)}, {Key: "signature", Value: []byte(message.Signature)}, {Key: "created", Value: []byte(message.CreatedAt.String())}, }, } if err := s.producer.Produce(context.Background(), record); err != nil { s.logger.Error(err, "failed to enqueue message to kafka") return false } return true } func (s *kafkaSender) getTargetTopicName(messageType string) string { messageTypeWithFallback := messageType if !collections.Include(s.routes, messageType) { messageTypeWithFallback = "public" } return s.topicPerMessageType[messageTypeWithFallback] }