...
1 package sender
2
3 import (
4 "context"
5
6 "edge-infra.dev/pkg/edge/datasync/chirp/model"
7 "edge-infra.dev/pkg/edge/datasync/chirp/persister"
8 "edge-infra.dev/pkg/edge/datasync/internal/collections"
9 "edge-infra.dev/pkg/edge/datasync/internal/config"
10 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
11 "edge-infra.dev/pkg/lib/fog"
12
13 "github.com/go-logr/logr"
14 )
15
16 type kafkaSender struct {
17 producer kafkaclient.Producer
18 topicPerMessageType map[string]string
19 routes []string
20 messagePersister persister.MessagePersister
21 metrics *metrics
22 logger logr.Logger
23 }
24
25 func NewKafkaSender(p kafkaclient.Producer, msgPersister persister.MessagePersister, chirpConfig *config.Config) MessageSender {
26 return &kafkaSender{
27 producer: p,
28 topicPerMessageType: config.GetTopics(chirpConfig),
29 routes: config.GetRoutes(chirpConfig),
30 messagePersister: msgPersister,
31 metrics: newMetrics(),
32 logger: fog.New(),
33 }
34 }
35
36 func (s *kafkaSender) Send(messages map[string]model.Message) []model.Message {
37 var sentMessages = make([]model.Message, 0)
38
39 for _, message := range messages {
40 isSent := s.sendMessage(message)
41
42 if isSent {
43 sentMessages = append(sentMessages, message)
44 }
45 }
46
47 if len(sentMessages) != 0 {
48 if len(messages) != len(sentMessages) {
49 s.logger.Info("partially succeeded to send messages")
50 }
51 s.metrics.SentMessagesTotal.WithLabelValues(
52 sentMessages[0].Type, sentMessages[0].TenantID).Add(float64(len(sentMessages)))
53 _ = s.messagePersister.Delete(sentMessages)
54 } else {
55 s.logger.Info("no messages were sent")
56 }
57
58 return sentMessages
59 }
60
61 func (s *kafkaSender) sendMessage(message model.Message) bool {
62 topic := s.getTargetTopicName(message.Type)
63 record := &kafkaclient.Record{
64 Topic: topic,
65 Value: message.Payload,
66 Headers: []kafkaclient.Header{
67 {Key: "id", Value: []byte(message.ID.String())},
68 {Key: "type", Value: []byte(message.Type)},
69 {Key: "organization", Value: []byte(message.TenantID)},
70 {Key: "organization_name", Value: []byte(message.OrganizationName)},
71 {Key: "organization_id", Value: []byte(message.OrganizationID)},
72 {Key: "site_id", Value: []byte(message.SiteID)},
73 {Key: "site_name", Value: []byte(message.SiteName)},
74 {Key: "signature", Value: []byte(message.Signature)},
75 {Key: "created", Value: []byte(message.CreatedAt.String())},
76 },
77 }
78 if err := s.producer.Produce(context.Background(), record); err != nil {
79 s.logger.Error(err, "failed to enqueue message to kafka")
80 return false
81 }
82 return true
83 }
84
85 func (s *kafkaSender) getTargetTopicName(messageType string) string {
86 messageTypeWithFallback := messageType
87 if !collections.Include(s.routes, messageType) {
88 messageTypeWithFallback = "public"
89 }
90
91 return s.topicPerMessageType[messageTypeWithFallback]
92 }
93
View as plain text