...
1 package worker
2
3 import (
4 "context"
5 "fmt"
6 "strconv"
7 "time"
8
9 "github.com/go-logr/logr"
10
11 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
12 "edge-infra.dev/pkg/edge/datasync/shoot/handler"
13 "edge-infra.dev/pkg/edge/datasync/shoot/model"
14 "edge-infra.dev/pkg/edge/datasync/shoot/pubsub"
15 "edge-infra.dev/pkg/lib/fog"
16 )
17
18 const (
19 readDelay = 2 * time.Second
20 )
21
22 type MainWorker struct {
23 topic string
24 consumer kafkaclient.Consumer
25 publisher pubsub.Publisher
26 failureHandler *handler.FailureHandler
27 metrics *Metrics
28 logger logr.Logger
29 }
30
31
32
33 func NewMainWorker(topic string, c kafkaclient.Consumer, p pubsub.Publisher, failureHandler *handler.FailureHandler, metrics *Metrics) Worker {
34 return &MainWorker{
35 topic: topic,
36 consumer: c,
37 publisher: p,
38 failureHandler: failureHandler,
39 metrics: metrics,
40 logger: fog.New(),
41 }
42 }
43
44 func (w *MainWorker) Start() {
45 w.metrics.Reset()
46 logger := w.logger
47 ctx := context.Background()
48 for {
49 messages, err := w.consumer.Read(ctx)
50 if err != nil {
51 logger.Error(err, "kafka error while trying to read messages from topic. will continue to retry.")
52 time.Sleep(readDelay)
53 continue
54 }
55
56 if len(messages) == 0 {
57 continue
58 }
59
60 w.metrics.MessagesReadTotal.WithLabelValues(w.topic).Add(float64(len(messages)))
61
62 messagesBulk := ConvertToMessagesModel(messages)
63 failedToBePublished := w.publisher.Publish(messagesBulk)
64
65 w.metrics.MessagesFailedToBePublishedTotal.WithLabelValues(w.topic).Add(float64(len(failedToBePublished)))
66
67 if len(failedToBePublished) == 0 {
68 w.metrics.MessagesSentTotal.WithLabelValues(w.topic).Add(float64(len(messages)))
69 w.ack(ctx, messages)
70 continue
71 }
72
73 logger.Error(err, "failed to publish messages", "count", len(messages), "topic", w.topic)
74
75 failedMessageIDs := getFailedMessagesID(failedToBePublished)
76 failedMessages := filterMessagesByIDs(messages, failedMessageIDs)
77
78 err = w.failureHandler.Handle(failedMessages)
79 if err != nil {
80 logger.Error(err, "failed to handle messages")
81 continue
82 }
83
84 w.ack(ctx, messages)
85 }
86 }
87
88
89 func (w *MainWorker) Stop() {
90 w.consumer.Stop()
91
92 }
93
94 func (w *MainWorker) ack(ctx context.Context, messages []*kafkaclient.Record) {
95 message := messages[len(messages)-1]
96 err := w.consumer.Ack(ctx, message)
97 logger := w.logger
98 if err != nil {
99 logger.Error(err, "failed to ack bulk by last message. all bulk will be retried soon")
100
101 w.metrics.KafkaFailedAcksTotal.WithLabelValues(w.topic).Inc()
102 } else {
103 logger.Info("successfully acked last message in bulk")
104 for _, msg := range messages {
105 msgID := getSuccessfulMessagesID(msg)
106 if msgID != "" {
107 logger.Info(fmt.Sprintf("successfully acked message ID: %s", msgID))
108 }
109 }
110 w.metrics.KafkaSuccessfulAcksTotal.WithLabelValues(w.topic).Inc()
111 w.metrics.KafkaLastAckedMessageGauge.
112 WithLabelValues(w.topic, strconv.FormatInt(int64(message.Partition), 10)).Set(float64(message.Offset))
113 }
114 }
115
116 func getFailedMessagesID(failures []*pubsub.PublishFailureResult) []model.SourceID {
117 result := make([]model.SourceID, 0)
118
119 for _, failure := range failures {
120 id := model.SourceID{
121 Topic: failure.Message.ID.Topic,
122 Partition: failure.Message.ID.Partition,
123 Offset: failure.Message.ID.Offset,
124 }
125
126 result = append(result, id)
127 }
128
129 return result
130 }
131
132 func filterMessagesByIDs(messages []*kafkaclient.Record, ids []model.SourceID) []*kafkaclient.Record {
133 result := make([]*kafkaclient.Record, 0)
134
135 for _, id := range ids {
136 for _, msg := range messages {
137 if msg.Topic == id.Topic &&
138 msg.Partition == id.Partition &&
139 msg.Offset == id.Offset {
140 result = append(result, msg)
141 }
142 }
143 }
144
145 return result
146 }
147
148 func getSuccessfulMessagesID(message *kafkaclient.Record) string {
149 for _, header := range message.Headers {
150 if header.Key == "id" {
151 return string(header.Value)
152 }
153 }
154 return ""
155 }
156
View as plain text