1 package files
2
3 import (
4 "fmt"
5 "os"
6 "strconv"
7 "strings"
8 "time"
9
10 "github.com/go-logr/logr"
11
12 "edge-infra.dev/pkg/lib/fog"
13
14 "edge-infra.dev/pkg/edge/datasync/chirp/model"
15 "edge-infra.dev/pkg/edge/datasync/chirp/persister"
16 "edge-infra.dev/pkg/edge/datasync/internal/config"
17 "edge-infra.dev/pkg/edge/datasync/internal/signature"
18 )
19
20 type filePersister struct {
21 balancers map[string]*PartitionBalancer
22 logger logr.Logger
23 partition int
24 chirpConfig *config.Config
25 }
26
27 func NewFilePersister(cfg *config.Config) (persister.MessagePersister, error) {
28 createOutboxFolders(cfg)
29
30
31
32
33 partitionBalancers := make(map[string]*PartitionBalancer)
34
35 return &filePersister{
36 balancers: partitionBalancers,
37 logger: fog.New(),
38 partition: cfg.Partition,
39 chirpConfig: cfg,
40 }, nil
41 }
42
43 func (p *filePersister) Save(message model.MessageData) error {
44 balancer := p.getOrCreatePartitionBalancer(message.Type)
45 partitionInfo := balancer.getNextPartition()
46
47 fileContent, err := createMessageFile(message, partitionInfo.Path, p.chirpConfig)
48 if err != nil {
49 return fmt.Errorf("failed to create message file. %v", err.Error())
50 }
51
52 var genratedSignature string
53 creationTimeStamp := time.Now().UTC()
54
55 shouldSignMessage := config.GetRouteConfig(message.Type, p.chirpConfig).Signing
56
57 if shouldSignMessage {
58 msgContext := signature.MessageSignatureContext{
59 MessageID: message.ID.String(),
60 MessageType: message.Type,
61 Payload: fileContent,
62 Timestamp: creationTimeStamp,
63 }
64
65 genratedSignature, err = signMessage(msgContext)
66
67 if err != nil {
68 p.logger.Error(err, "failed to sign message. message will not be persisted")
69
70
71
72
73
74
75 return fmt.Errorf("failed to sign message. message will not be persisted (type: %v, id: %v). %v",
76 msgContext.MessageType, msgContext.MessageID, err.Error())
77 }
78 }
79
80 metadata := &model.MessageMetadata{
81 MessageType: message.Type,
82 CreatedAt: creationTimeStamp,
83 TenantID: config.OrganizationName(),
84 OrganizationName: config.OrganizationName(),
85 OrganizationID: config.OrganizationID(),
86 SiteID: config.SiteID(),
87 SiteName: config.SiteName(),
88 Signature: genratedSignature,
89 }
90
91 errMetadata := createMetadateFile(message.ID, metadata, partitionInfo.Path)
92 if errMetadata != nil {
93 return fmt.Errorf("failed to create message metadata file. %v", err.Error())
94 }
95
96 p.logger.Info("successfully created message files")
97
98 return nil
99 }
100
101 func (p *filePersister) Delete(messages []model.Message) error {
102 var failureCount int
103
104 for _, message := range messages {
105 err := deleteMessageFiles(message.FilePath)
106
107 if err != nil {
108 p.logger.Error(err, "failed to delete message files")
109 failureCount++
110 }
111 }
112
113 if failureCount > 0 {
114
115
116 err := fmt.Errorf("failed to delete %d messages", failureCount)
117 p.logger.Error(err, "failed to delete message files")
118 return err
119 }
120
121 return nil
122 }
123
124 func deleteMessageFiles(pathToDsmFile string) error {
125 err := os.Remove(pathToDsmFile)
126 if err != nil {
127 return fmt.Errorf("failed to delete %v", pathToDsmFile)
128 }
129
130 pathToMsgFile := strings.ReplaceAll(pathToDsmFile, ".dsm", ".msg")
131 err = os.Remove(pathToMsgFile)
132 if err != nil {
133 return fmt.Errorf("failed to delete %v", pathToMsgFile)
134 }
135
136 return nil
137 }
138
139 func (p *filePersister) getOrCreatePartitionBalancer(messageType string) *PartitionBalancer {
140 balancer, ok := p.balancers[messageType]
141 if !ok {
142 balancer = NewPartitionBalancer(messageType, p.partition, p.chirpConfig)
143 p.balancers[messageType] = balancer
144 }
145
146 return balancer
147 }
148
149 func signMessage(msgContext signature.MessageSignatureContext) (string, error) {
150 sharedKey := config.GetSharedKey()
151 secretKey := config.GetSecretKey()
152 organizationName := config.OrganizationName()
153
154 genratedSignature, err := signature.Generate(sharedKey, secretKey, msgContext, organizationName)
155 if err != nil {
156 return "", fmt.Errorf("failed to sign message. %v", err.Error())
157 }
158
159 return genratedSignature, nil
160 }
161
162 func createOutboxFolders(cfg *config.Config) {
163 logger := fog.New()
164
165 messageTypes := config.GetRoutes(cfg)
166 partitions := cfg.Partition
167
168 logger.Info("creating outboxes", "outboxes", messageTypes)
169
170 for _, outbox := range messageTypes {
171 for partition := 0; partition < partitions; partition++ {
172 pathToFile := fmt.Sprintf("%s/%s/%s", config.GetOutboxPath(cfg), outbox, strconv.Itoa(partition))
173 err := os.MkdirAll(pathToFile, 0755)
174
175 if err != nil {
176 logger.Error(err, "failed to create outbox", "outbox pathToFile", pathToFile)
177 }
178 }
179 }
180
181 logger.Info("created outboxes successfully")
182 }
183
View as plain text