...
1 package files
2
3 import (
4 "encoding/json"
5 "fmt"
6 "os"
7 "strings"
8
9 "github.com/google/uuid"
10
11 "edge-infra.dev/pkg/edge/datasync/chirp/model"
12 gzip "edge-infra.dev/pkg/edge/datasync/internal/compressor"
13 "edge-infra.dev/pkg/edge/datasync/internal/config"
14 )
15
16 func createMessageFile(message model.MessageData, pathToPartition string, chirpConfig *config.Config) ([]byte, error) {
17 fileNameWithoutExt := fmt.Sprintf("%s_%s", message.Type, message.ID)
18 fullPathWithoutExt := fmt.Sprintf("%s/%s", pathToPartition, fileNameWithoutExt)
19
20 filePath := fullPathWithoutExt + ".msg"
21
22 shouldCompress := config.ShouldCompressMessage(message.Type, chirpConfig)
23
24 if shouldCompress {
25 err := gzip.CompressToFile(filePath, message.Payload)
26 if err != nil {
27 return nil, err
28 }
29
30 compressedContent, err := gzip.ReadCompressedContent(filePath)
31 if err != nil {
32 return nil, err
33 }
34
35 return compressedContent, nil
36 }
37
38 err := os.WriteFile(filePath, message.Payload, 0644)
39 if err != nil {
40 return nil, err
41 }
42
43 return message.Payload, nil
44 }
45
46 func createMetadateFile(messageID uuid.UUID, metadata *model.MessageMetadata, pathToPartition string) error {
47 fileNameWithoutExt := fmt.Sprintf("%s_%s", metadata.MessageType, messageID)
48 fullPathWithoutExt := fmt.Sprintf("%s/%s", pathToPartition, fileNameWithoutExt)
49
50 content, _ := json.MarshalIndent(metadata, "", " ")
51
52 tempExt := ".dsm.tmp"
53 tempPath := fullPathWithoutExt + tempExt
54
55 f, err := os.Create(tempPath)
56 if err != nil {
57 return err
58 }
59 defer f.Close()
60
61 err = os.WriteFile(tempPath, content, 0644)
62
63 if err != nil {
64 return err
65 }
66
67 dsmFullPath := strings.ReplaceAll(tempPath, tempExt, ".dsm")
68 err = os.Rename(tempPath, dsmFullPath)
69
70 if err != nil {
71 return err
72 }
73
74 return nil
75 }
76
View as plain text