...
1 package worker
2
3 import (
4 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
5 "edge-infra.dev/pkg/edge/datasync/shoot/model"
6 )
7
8
9 func ConvertToMessagesModel(kafkaMessages []*kafkaclient.Record) []*model.Message {
10 result := make([]*model.Message, 0)
11
12 for _, message := range kafkaMessages {
13 result = append(result, toMessageModel(message))
14 }
15
16 return result
17 }
18
19 func toMessageModel(kafkaMessage *kafkaclient.Record) *model.Message {
20 return &model.Message{
21 ID: model.SourceID{
22 Topic: kafkaMessage.Topic,
23 Partition: kafkaMessage.Partition,
24 Offset: kafkaMessage.Offset,
25 },
26 Headers: toHeadersModel(kafkaMessage.Headers),
27 Payload: kafkaMessage.Value,
28 }
29 }
30
31 func toHeadersModel(headers []kafkaclient.Header) map[string]string {
32 result := make(map[string]string)
33
34 for _, h := range headers {
35 result[h.Key] = string(h.Value)
36 }
37
38 return result
39 }
40
View as plain text