...

Source file src/edge-infra.dev/pkg/edge/datasync/shoot/worker/message_converter.go

Documentation: edge-infra.dev/pkg/edge/datasync/shoot/worker

     1  package worker
     2  
     3  import (
     4  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
     5  	"edge-infra.dev/pkg/edge/datasync/shoot/model"
     6  )
     7  
     8  // ConvertToMessagesModel converts Kafka messages into model messages.
     9  func ConvertToMessagesModel(kafkaMessages []*kafkaclient.Record) []*model.Message {
    10  	result := make([]*model.Message, 0)
    11  
    12  	for _, message := range kafkaMessages {
    13  		result = append(result, toMessageModel(message))
    14  	}
    15  
    16  	return result
    17  }
    18  
    19  func toMessageModel(kafkaMessage *kafkaclient.Record) *model.Message {
    20  	return &model.Message{
    21  		ID: model.SourceID{
    22  			Topic:     kafkaMessage.Topic,
    23  			Partition: kafkaMessage.Partition,
    24  			Offset:    kafkaMessage.Offset,
    25  		},
    26  		Headers: toHeadersModel(kafkaMessage.Headers),
    27  		Payload: kafkaMessage.Value,
    28  	}
    29  }
    30  
    31  func toHeadersModel(headers []kafkaclient.Header) map[string]string {
    32  	result := make(map[string]string)
    33  
    34  	for _, h := range headers {
    35  		result[h.Key] = string(h.Value)
    36  	}
    37  
    38  	return result
    39  }
    40  

View as plain text