...

Source file src/edge-infra.dev/pkg/edge/datasync/shoot/worker/main_worker.go

Documentation: edge-infra.dev/pkg/edge/datasync/shoot/worker

     1  package worker
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strconv"
     7  	"time"
     8  
     9  	"github.com/go-logr/logr"
    10  
    11  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
    12  	"edge-infra.dev/pkg/edge/datasync/shoot/handler"
    13  	"edge-infra.dev/pkg/edge/datasync/shoot/model"
    14  	"edge-infra.dev/pkg/edge/datasync/shoot/pubsub"
    15  	"edge-infra.dev/pkg/lib/fog"
    16  )
    17  
    18  const (
    19  	readDelay = 2 * time.Second
    20  )
    21  
    22  type MainWorker struct {
    23  	topic          string
    24  	consumer       kafkaclient.Consumer
    25  	publisher      pubsub.Publisher
    26  	failureHandler *handler.FailureHandler
    27  	metrics        *Metrics
    28  	logger         logr.Logger
    29  }
    30  
    31  // NewMainWorker creates a new worker that once is started,
    32  // will consume messages from Kafka topic and publish them to Google Pub/Sub
    33  func NewMainWorker(topic string, c kafkaclient.Consumer, p pubsub.Publisher, failureHandler *handler.FailureHandler, metrics *Metrics) Worker {
    34  	return &MainWorker{
    35  		topic:          topic,
    36  		consumer:       c,
    37  		publisher:      p,
    38  		failureHandler: failureHandler,
    39  		metrics:        metrics,
    40  		logger:         fog.New(),
    41  	}
    42  }
    43  
    44  func (w *MainWorker) Start() {
    45  	w.metrics.Reset()
    46  	logger := w.logger
    47  	ctx := context.Background()
    48  	for {
    49  		messages, err := w.consumer.Read(ctx)
    50  		if err != nil {
    51  			logger.Error(err, "kafka error while trying to read messages from topic. will continue to retry.")
    52  			time.Sleep(readDelay)
    53  			continue
    54  		}
    55  
    56  		if len(messages) == 0 {
    57  			continue
    58  		}
    59  
    60  		w.metrics.MessagesReadTotal.WithLabelValues(w.topic).Add(float64(len(messages)))
    61  
    62  		messagesBulk := ConvertToMessagesModel(messages)
    63  		failedToBePublished := w.publisher.Publish(messagesBulk)
    64  
    65  		w.metrics.MessagesFailedToBePublishedTotal.WithLabelValues(w.topic).Add(float64(len(failedToBePublished)))
    66  
    67  		if len(failedToBePublished) == 0 {
    68  			w.metrics.MessagesSentTotal.WithLabelValues(w.topic).Add(float64(len(messages)))
    69  			w.ack(ctx, messages)
    70  			continue
    71  		}
    72  
    73  		logger.Error(err, "failed to publish messages", "count", len(messages), "topic", w.topic)
    74  
    75  		failedMessageIDs := getFailedMessagesID(failedToBePublished)
    76  		failedMessages := filterMessagesByIDs(messages, failedMessageIDs)
    77  
    78  		err = w.failureHandler.Handle(failedMessages)
    79  		if err != nil {
    80  			logger.Error(err, "failed to handle messages")
    81  			continue
    82  		}
    83  
    84  		w.ack(ctx, messages)
    85  	}
    86  }
    87  
    88  // Stop signals the consumer to stop consuming messages from Kafka
    89  func (w *MainWorker) Stop() {
    90  	w.consumer.Stop()
    91  	//TODO: stop producer & publisher
    92  }
    93  
    94  func (w *MainWorker) ack(ctx context.Context, messages []*kafkaclient.Record) {
    95  	message := messages[len(messages)-1]
    96  	err := w.consumer.Ack(ctx, message)
    97  	logger := w.logger
    98  	if err != nil {
    99  		logger.Error(err, "failed to ack bulk by last message. all bulk will be retried soon")
   100  
   101  		w.metrics.KafkaFailedAcksTotal.WithLabelValues(w.topic).Inc()
   102  	} else {
   103  		logger.Info("successfully acked last message in bulk")
   104  		for _, msg := range messages {
   105  			msgID := getSuccessfulMessagesID(msg)
   106  			if msgID != "" {
   107  				logger.Info(fmt.Sprintf("successfully acked message ID: %s", msgID))
   108  			}
   109  		}
   110  		w.metrics.KafkaSuccessfulAcksTotal.WithLabelValues(w.topic).Inc()
   111  		w.metrics.KafkaLastAckedMessageGauge.
   112  			WithLabelValues(w.topic, strconv.FormatInt(int64(message.Partition), 10)).Set(float64(message.Offset))
   113  	}
   114  }
   115  
   116  func getFailedMessagesID(failures []*pubsub.PublishFailureResult) []model.SourceID {
   117  	result := make([]model.SourceID, 0)
   118  
   119  	for _, failure := range failures {
   120  		id := model.SourceID{
   121  			Topic:     failure.Message.ID.Topic,
   122  			Partition: failure.Message.ID.Partition,
   123  			Offset:    failure.Message.ID.Offset,
   124  		}
   125  
   126  		result = append(result, id)
   127  	}
   128  
   129  	return result
   130  }
   131  
   132  func filterMessagesByIDs(messages []*kafkaclient.Record, ids []model.SourceID) []*kafkaclient.Record {
   133  	result := make([]*kafkaclient.Record, 0)
   134  
   135  	for _, id := range ids {
   136  		for _, msg := range messages {
   137  			if msg.Topic == id.Topic &&
   138  				msg.Partition == id.Partition &&
   139  				msg.Offset == id.Offset {
   140  				result = append(result, msg)
   141  			}
   142  		}
   143  	}
   144  
   145  	return result
   146  }
   147  
   148  func getSuccessfulMessagesID(message *kafkaclient.Record) string {
   149  	for _, header := range message.Headers {
   150  		if header.Key == "id" {
   151  			return string(header.Value)
   152  		}
   153  	}
   154  	return ""
   155  }
   156  

View as plain text