1 package outbox
2
3 import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "io"
8 "os"
9 "path"
10 "path/filepath"
11 "regexp"
12 "strconv"
13 "strings"
14
15 "github.com/google/uuid"
16
17 "edge-infra.dev/pkg/edge/datasync/chirp/model"
18 "edge-infra.dev/pkg/lib/logging"
19 )
20
21 type fileRepository struct {
22 outboxPath string
23 messagesType string
24 partition int
25 fullPath string
26 logger *logging.EdgeLogger
27 }
28
29 func NewFileRepository(pathToOutbox string, msgType string, partitionNumber int) MessageRepository {
30 fullPath := fmt.Sprintf("%s/%s/%s", pathToOutbox, msgType, strconv.Itoa(partitionNumber))
31
32 fr := &fileRepository{
33 outboxPath: pathToOutbox,
34 messagesType: msgType,
35 partition: partitionNumber,
36 fullPath: fullPath,
37 logger: logging.NewLogger(),
38 }
39
40 return fr
41 }
42
43 func (fr *fileRepository) GetMessages(count int) map[string]model.Message {
44 result := make(map[string]model.Message, count)
45
46 dictionary := fr.getMessagePairs(count)
47
48 for k, v := range dictionary {
49 if v.Payload == nil {
50 continue
51 }
52
53 message := fr.convertToMessage(k, v)
54 result[k] = *message
55 }
56
57 return result
58 }
59
60 func (fr *fileRepository) getMatchingFiles(pattern, matchingExt string, count int) map[os.FileInfo]os.FileInfo {
61 matchingFiles := make(map[os.FileInfo]os.FileInfo, count)
62 _ = filepath.Walk(fr.fullPath, func(filePath string, f os.FileInfo, _ error) error {
63 if f == nil || f.IsDir() {
64 return nil
65 }
66 matched, err := regexp.MatchString(pattern, f.Name())
67 if err != nil || !matched {
68 return nil
69 }
70 ext := path.Ext(filePath)
71 matchingFilePath := filePath[0:len(filePath)-len(ext)] + matchingExt
72 fi, err := os.Stat(matchingFilePath)
73 if err != nil {
74 return nil
75 }
76 matchingFiles[f] = fi
77 if len(matchingFiles) == count {
78 return io.EOF
79 }
80 return nil
81 })
82
83
84
85
86
87 return matchingFiles
88 }
89
90 func (fr *fileRepository) getMessagePairs(count int) map[string]model.MessagePair {
91 matchingFiles := fr.getMatchingFiles(".dsm", ".msg", count)
92
93 filesMap := make(map[string]model.MessagePair, len(matchingFiles))
94
95 for meta, msg := range matchingFiles {
96 key := GetMessageID(meta)
97 filesMap[key] = model.MessagePair{Metadata: meta, Payload: msg}
98 }
99
100 return filesMap
101 }
102
103 func GetMessageID(f os.FileInfo) string {
104 name := f.Name()
105 result := strings.Split(name, "_")
106 result = strings.Split(result[1], ".")
107 return result[0]
108 }
109
110 func GetFileMessageType(f os.FileInfo) string {
111 name := f.Name()
112 result := strings.Split(name, "_")
113 return result[0]
114 }
115
116 func (fr *fileRepository) convertToMessage(messageID string, pair model.MessagePair) *model.Message {
117 pathToMetadataFile := fmt.Sprintf("%s/%s", fr.fullPath, pair.Metadata.Name())
118 file, _ := os.ReadFile(pathToMetadataFile)
119
120 data := model.MessageMetadata{}
121 err := json.Unmarshal(file, &data)
122
123 if err != nil {
124 fr.logger.Error(err, "failed to read metadata file")
125 }
126
127 pathToMessageFile := fmt.Sprintf("%s/%s", fr.fullPath, pair.Payload.Name())
128
129 payload, err := readFileContent(pathToMessageFile)
130 if err != nil {
131 fr.logger.Error(err, "failed to read message file")
132 }
133
134 msgID, _ := uuid.Parse(messageID)
135
136 return &model.Message{
137 ID: msgID,
138 Type: data.MessageType,
139 TenantID: data.TenantID,
140 OrganizationID: data.OrganizationID,
141 OrganizationName: data.OrganizationName,
142 SiteID: data.SiteID,
143 SiteName: data.SiteName,
144 CreatedAt: data.CreatedAt,
145 Signature: data.Signature,
146 FilePath: pathToMetadataFile,
147 Payload: payload,
148 }
149 }
150
151 func readFileContent(fullPath string) ([]byte, error) {
152 file, err := os.Open(fullPath)
153 if err != nil {
154 return nil, errors.New("failed to open file")
155 }
156
157 payload, err := io.ReadAll(file)
158 if err != nil {
159 return nil, errors.New("failed to read file content")
160 }
161
162 return payload, nil
163 }
164
View as plain text