package worker import ( "context" "fmt" "strconv" "time" "github.com/go-logr/logr" "edge-infra.dev/pkg/edge/datasync/kafkaclient" "edge-infra.dev/pkg/edge/datasync/shoot/handler" "edge-infra.dev/pkg/edge/datasync/shoot/model" "edge-infra.dev/pkg/edge/datasync/shoot/pubsub" "edge-infra.dev/pkg/lib/fog" ) const ( readDelay = 2 * time.Second ) type MainWorker struct { topic string consumer kafkaclient.Consumer publisher pubsub.Publisher failureHandler *handler.FailureHandler metrics *Metrics logger logr.Logger } // NewMainWorker creates a new worker that once is started, // will consume messages from Kafka topic and publish them to Google Pub/Sub func NewMainWorker(topic string, c kafkaclient.Consumer, p pubsub.Publisher, failureHandler *handler.FailureHandler, metrics *Metrics) Worker { return &MainWorker{ topic: topic, consumer: c, publisher: p, failureHandler: failureHandler, metrics: metrics, logger: fog.New(), } } func (w *MainWorker) Start() { w.metrics.Reset() logger := w.logger ctx := context.Background() for { messages, err := w.consumer.Read(ctx) if err != nil { logger.Error(err, "kafka error while trying to read messages from topic. will continue to retry.") time.Sleep(readDelay) continue } if len(messages) == 0 { continue } w.metrics.MessagesReadTotal.WithLabelValues(w.topic).Add(float64(len(messages))) messagesBulk := ConvertToMessagesModel(messages) failedToBePublished := w.publisher.Publish(messagesBulk) w.metrics.MessagesFailedToBePublishedTotal.WithLabelValues(w.topic).Add(float64(len(failedToBePublished))) if len(failedToBePublished) == 0 { w.metrics.MessagesSentTotal.WithLabelValues(w.topic).Add(float64(len(messages))) w.ack(ctx, messages) continue } logger.Error(err, "failed to publish messages", "count", len(messages), "topic", w.topic) failedMessageIDs := getFailedMessagesID(failedToBePublished) failedMessages := filterMessagesByIDs(messages, failedMessageIDs) err = w.failureHandler.Handle(failedMessages) if err != nil { logger.Error(err, "failed to handle messages") continue } w.ack(ctx, messages) } } // Stop signals the consumer to stop consuming messages from Kafka func (w *MainWorker) Stop() { w.consumer.Stop() //TODO: stop producer & publisher } func (w *MainWorker) ack(ctx context.Context, messages []*kafkaclient.Record) { message := messages[len(messages)-1] err := w.consumer.Ack(ctx, message) logger := w.logger if err != nil { logger.Error(err, "failed to ack bulk by last message. all bulk will be retried soon") w.metrics.KafkaFailedAcksTotal.WithLabelValues(w.topic).Inc() } else { logger.Info("successfully acked last message in bulk") for _, msg := range messages { msgID := getSuccessfulMessagesID(msg) if msgID != "" { logger.Info(fmt.Sprintf("successfully acked message ID: %s", msgID)) } } w.metrics.KafkaSuccessfulAcksTotal.WithLabelValues(w.topic).Inc() w.metrics.KafkaLastAckedMessageGauge. WithLabelValues(w.topic, strconv.FormatInt(int64(message.Partition), 10)).Set(float64(message.Offset)) } } func getFailedMessagesID(failures []*pubsub.PublishFailureResult) []model.SourceID { result := make([]model.SourceID, 0) for _, failure := range failures { id := model.SourceID{ Topic: failure.Message.ID.Topic, Partition: failure.Message.ID.Partition, Offset: failure.Message.ID.Offset, } result = append(result, id) } return result } func filterMessagesByIDs(messages []*kafkaclient.Record, ids []model.SourceID) []*kafkaclient.Record { result := make([]*kafkaclient.Record, 0) for _, id := range ids { for _, msg := range messages { if msg.Topic == id.Topic && msg.Partition == id.Partition && msg.Offset == id.Offset { result = append(result, msg) } } } return result } func getSuccessfulMessagesID(message *kafkaclient.Record) string { for _, header := range message.Headers { if header.Key == "id" { return string(header.Value) } } return "" }