package outbox import ( "encoding/json" "errors" "fmt" "io" "os" "path" "path/filepath" "regexp" "strconv" "strings" "github.com/google/uuid" "edge-infra.dev/pkg/edge/datasync/chirp/model" "edge-infra.dev/pkg/lib/logging" ) type fileRepository struct { outboxPath string messagesType string partition int fullPath string logger *logging.EdgeLogger } func NewFileRepository(pathToOutbox string, msgType string, partitionNumber int) MessageRepository { fullPath := fmt.Sprintf("%s/%s/%s", pathToOutbox, msgType, strconv.Itoa(partitionNumber)) fr := &fileRepository{ outboxPath: pathToOutbox, messagesType: msgType, partition: partitionNumber, fullPath: fullPath, logger: logging.NewLogger(), } return fr } func (fr *fileRepository) GetMessages(count int) map[string]model.Message { result := make(map[string]model.Message, count) dictionary := fr.getMessagePairs(count) for k, v := range dictionary { if v.Payload == nil { continue } message := fr.convertToMessage(k, v) result[k] = *message } return result } func (fr *fileRepository) getMatchingFiles(pattern, matchingExt string, count int) map[os.FileInfo]os.FileInfo { matchingFiles := make(map[os.FileInfo]os.FileInfo, count) _ = filepath.Walk(fr.fullPath, func(filePath string, f os.FileInfo, _ error) error { if f == nil || f.IsDir() { return nil // ignore: we want files only } matched, err := regexp.MatchString(pattern, f.Name()) if err != nil || !matched { return nil // ignore: file name not matching } ext := path.Ext(filePath) matchingFilePath := filePath[0:len(filePath)-len(ext)] + matchingExt fi, err := os.Stat(matchingFilePath) if err != nil { return nil // ignore: file can be read or not exist } matchingFiles[f] = fi if len(matchingFiles) == count { return io.EOF } return nil }) // if err == io.EOF { // err = nil // } return matchingFiles } func (fr *fileRepository) getMessagePairs(count int) map[string]model.MessagePair { matchingFiles := fr.getMatchingFiles(".dsm", ".msg", count) filesMap := make(map[string]model.MessagePair, len(matchingFiles)) for meta, msg := range matchingFiles { key := GetMessageID(meta) filesMap[key] = model.MessagePair{Metadata: meta, Payload: msg} } return filesMap } func GetMessageID(f os.FileInfo) string { name := f.Name() result := strings.Split(name, "_") result = strings.Split(result[1], ".") return result[0] } func GetFileMessageType(f os.FileInfo) string { name := f.Name() result := strings.Split(name, "_") return result[0] } func (fr *fileRepository) convertToMessage(messageID string, pair model.MessagePair) *model.Message { pathToMetadataFile := fmt.Sprintf("%s/%s", fr.fullPath, pair.Metadata.Name()) file, _ := os.ReadFile(pathToMetadataFile) data := model.MessageMetadata{} err := json.Unmarshal(file, &data) if err != nil { fr.logger.Error(err, "failed to read metadata file") } pathToMessageFile := fmt.Sprintf("%s/%s", fr.fullPath, pair.Payload.Name()) payload, err := readFileContent(pathToMessageFile) if err != nil { fr.logger.Error(err, "failed to read message file") } msgID, _ := uuid.Parse(messageID) return &model.Message{ ID: msgID, Type: data.MessageType, TenantID: data.TenantID, OrganizationID: data.OrganizationID, OrganizationName: data.OrganizationName, SiteID: data.SiteID, SiteName: data.SiteName, CreatedAt: data.CreatedAt, Signature: data.Signature, FilePath: pathToMetadataFile, Payload: payload, } } func readFileContent(fullPath string) ([]byte, error) { file, err := os.Open(fullPath) if err != nil { return nil, errors.New("failed to open file") } payload, err := io.ReadAll(file) if err != nil { return nil, errors.New("failed to read file content") } return payload, nil }