package worker import ( "edge-infra.dev/pkg/edge/datasync/kafkaclient" "edge-infra.dev/pkg/edge/datasync/shoot/model" ) // ConvertToMessagesModel converts Kafka messages into model messages. func ConvertToMessagesModel(kafkaMessages []*kafkaclient.Record) []*model.Message { result := make([]*model.Message, 0) for _, message := range kafkaMessages { result = append(result, toMessageModel(message)) } return result } func toMessageModel(kafkaMessage *kafkaclient.Record) *model.Message { return &model.Message{ ID: model.SourceID{ Topic: kafkaMessage.Topic, Partition: kafkaMessage.Partition, Offset: kafkaMessage.Offset, }, Headers: toHeadersModel(kafkaMessage.Headers), Payload: kafkaMessage.Value, } } func toHeadersModel(headers []kafkaclient.Header) map[string]string { result := make(map[string]string) for _, h := range headers { result[h.Key] = string(h.Value) } return result }