...

Source file src/edge-infra.dev/pkg/edge/datasync/internal/outbox/file-repository.go

Documentation: edge-infra.dev/pkg/edge/datasync/internal/outbox

     1  package outbox
     2  
     3  import (
     4  	"encoding/json"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"os"
     9  	"path"
    10  	"path/filepath"
    11  	"regexp"
    12  	"strconv"
    13  	"strings"
    14  
    15  	"github.com/google/uuid"
    16  
    17  	"edge-infra.dev/pkg/edge/datasync/chirp/model"
    18  	"edge-infra.dev/pkg/lib/logging"
    19  )
    20  
    21  type fileRepository struct {
    22  	outboxPath   string
    23  	messagesType string
    24  	partition    int
    25  	fullPath     string
    26  	logger       *logging.EdgeLogger
    27  }
    28  
    29  func NewFileRepository(pathToOutbox string, msgType string, partitionNumber int) MessageRepository {
    30  	fullPath := fmt.Sprintf("%s/%s/%s", pathToOutbox, msgType, strconv.Itoa(partitionNumber))
    31  
    32  	fr := &fileRepository{
    33  		outboxPath:   pathToOutbox,
    34  		messagesType: msgType,
    35  		partition:    partitionNumber,
    36  		fullPath:     fullPath,
    37  		logger:       logging.NewLogger(),
    38  	}
    39  
    40  	return fr
    41  }
    42  
    43  func (fr *fileRepository) GetMessages(count int) map[string]model.Message {
    44  	result := make(map[string]model.Message, count)
    45  
    46  	dictionary := fr.getMessagePairs(count)
    47  
    48  	for k, v := range dictionary {
    49  		if v.Payload == nil {
    50  			continue
    51  		}
    52  
    53  		message := fr.convertToMessage(k, v)
    54  		result[k] = *message
    55  	}
    56  
    57  	return result
    58  }
    59  
    60  func (fr *fileRepository) getMatchingFiles(pattern, matchingExt string, count int) map[os.FileInfo]os.FileInfo {
    61  	matchingFiles := make(map[os.FileInfo]os.FileInfo, count)
    62  	_ = filepath.Walk(fr.fullPath, func(filePath string, f os.FileInfo, _ error) error {
    63  		if f == nil || f.IsDir() {
    64  			return nil // ignore: we want files only
    65  		}
    66  		matched, err := regexp.MatchString(pattern, f.Name())
    67  		if err != nil || !matched {
    68  			return nil // ignore: file name not matching
    69  		}
    70  		ext := path.Ext(filePath)
    71  		matchingFilePath := filePath[0:len(filePath)-len(ext)] + matchingExt
    72  		fi, err := os.Stat(matchingFilePath)
    73  		if err != nil {
    74  			return nil // ignore: file can be read or not exist
    75  		}
    76  		matchingFiles[f] = fi
    77  		if len(matchingFiles) == count {
    78  			return io.EOF
    79  		}
    80  		return nil
    81  	})
    82  
    83  	// if err == io.EOF {
    84  	// 	err = nil
    85  	// }
    86  
    87  	return matchingFiles
    88  }
    89  
    90  func (fr *fileRepository) getMessagePairs(count int) map[string]model.MessagePair {
    91  	matchingFiles := fr.getMatchingFiles(".dsm", ".msg", count)
    92  
    93  	filesMap := make(map[string]model.MessagePair, len(matchingFiles))
    94  
    95  	for meta, msg := range matchingFiles {
    96  		key := GetMessageID(meta)
    97  		filesMap[key] = model.MessagePair{Metadata: meta, Payload: msg}
    98  	}
    99  
   100  	return filesMap
   101  }
   102  
   103  func GetMessageID(f os.FileInfo) string {
   104  	name := f.Name()
   105  	result := strings.Split(name, "_")
   106  	result = strings.Split(result[1], ".")
   107  	return result[0]
   108  }
   109  
   110  func GetFileMessageType(f os.FileInfo) string {
   111  	name := f.Name()
   112  	result := strings.Split(name, "_")
   113  	return result[0]
   114  }
   115  
   116  func (fr *fileRepository) convertToMessage(messageID string, pair model.MessagePair) *model.Message {
   117  	pathToMetadataFile := fmt.Sprintf("%s/%s", fr.fullPath, pair.Metadata.Name())
   118  	file, _ := os.ReadFile(pathToMetadataFile)
   119  
   120  	data := model.MessageMetadata{}
   121  	err := json.Unmarshal(file, &data)
   122  
   123  	if err != nil {
   124  		fr.logger.Error(err, "failed to read metadata file")
   125  	}
   126  
   127  	pathToMessageFile := fmt.Sprintf("%s/%s", fr.fullPath, pair.Payload.Name())
   128  
   129  	payload, err := readFileContent(pathToMessageFile)
   130  	if err != nil {
   131  		fr.logger.Error(err, "failed to read message file")
   132  	}
   133  
   134  	msgID, _ := uuid.Parse(messageID)
   135  
   136  	return &model.Message{
   137  		ID:               msgID,
   138  		Type:             data.MessageType,
   139  		TenantID:         data.TenantID,
   140  		OrganizationID:   data.OrganizationID,
   141  		OrganizationName: data.OrganizationName,
   142  		SiteID:           data.SiteID,
   143  		SiteName:         data.SiteName,
   144  		CreatedAt:        data.CreatedAt,
   145  		Signature:        data.Signature,
   146  		FilePath:         pathToMetadataFile,
   147  		Payload:          payload,
   148  	}
   149  }
   150  
   151  func readFileContent(fullPath string) ([]byte, error) {
   152  	file, err := os.Open(fullPath)
   153  	if err != nil {
   154  		return nil, errors.New("failed to open file")
   155  	}
   156  
   157  	payload, err := io.ReadAll(file)
   158  	if err != nil {
   159  		return nil, errors.New("failed to read file content")
   160  	}
   161  
   162  	return payload, nil
   163  }
   164  

View as plain text