package files import ( "fmt" "os" "strconv" "strings" "time" "github.com/go-logr/logr" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/edge/datasync/chirp/model" "edge-infra.dev/pkg/edge/datasync/chirp/persister" "edge-infra.dev/pkg/edge/datasync/internal/config" "edge-infra.dev/pkg/edge/datasync/internal/signature" ) type filePersister struct { balancers map[string]*PartitionBalancer logger logr.Logger partition int chirpConfig *config.Config } func NewFilePersister(cfg *config.Config) (persister.MessagePersister, error) { createOutboxFolders(cfg) // if err != nil { // return nil, err // } partitionBalancers := make(map[string]*PartitionBalancer) return &filePersister{ balancers: partitionBalancers, logger: fog.New(), partition: cfg.Partition, chirpConfig: cfg, }, nil } func (p *filePersister) Save(message model.MessageData) error { balancer := p.getOrCreatePartitionBalancer(message.Type) partitionInfo := balancer.getNextPartition() fileContent, err := createMessageFile(message, partitionInfo.Path, p.chirpConfig) if err != nil { return fmt.Errorf("failed to create message file. %v", err.Error()) } var genratedSignature string creationTimeStamp := time.Now().UTC() shouldSignMessage := config.GetRouteConfig(message.Type, p.chirpConfig).Signing if shouldSignMessage { msgContext := signature.MessageSignatureContext{ MessageID: message.ID.String(), MessageType: message.Type, Payload: fileContent, Timestamp: creationTimeStamp, } genratedSignature, err = signMessage(msgContext) if err != nil { p.logger.Error(err, "failed to sign message. message will not be persisted") // p.logger.WithFields(map[string]interface{}{ // "source": "file-persister.SaveMessage", // "messageType": msgContext.MessageType, // "messageId": msgContext.MessageId, // }).Error("failed to sign message. message will not be persisted") return fmt.Errorf("failed to sign message. message will not be persisted (type: %v, id: %v). %v", msgContext.MessageType, msgContext.MessageID, err.Error()) } } metadata := &model.MessageMetadata{ MessageType: message.Type, CreatedAt: creationTimeStamp, TenantID: config.OrganizationName(), OrganizationName: config.OrganizationName(), OrganizationID: config.OrganizationID(), SiteID: config.SiteID(), SiteName: config.SiteName(), Signature: genratedSignature, } errMetadata := createMetadateFile(message.ID, metadata, partitionInfo.Path) if errMetadata != nil { return fmt.Errorf("failed to create message metadata file. %v", err.Error()) } p.logger.Info("successfully created message files") return nil } func (p *filePersister) Delete(messages []model.Message) error { var failureCount int for _, message := range messages { err := deleteMessageFiles(message.FilePath) if err != nil { p.logger.Error(err, "failed to delete message files") failureCount++ } } if failureCount > 0 { //TODO: what happens when message was sent but failed to be deleted // currently it will be sent again err := fmt.Errorf("failed to delete %d messages", failureCount) p.logger.Error(err, "failed to delete message files") return err } return nil } func deleteMessageFiles(pathToDsmFile string) error { err := os.Remove(pathToDsmFile) if err != nil { return fmt.Errorf("failed to delete %v", pathToDsmFile) } pathToMsgFile := strings.ReplaceAll(pathToDsmFile, ".dsm", ".msg") err = os.Remove(pathToMsgFile) if err != nil { return fmt.Errorf("failed to delete %v", pathToMsgFile) } return nil } func (p *filePersister) getOrCreatePartitionBalancer(messageType string) *PartitionBalancer { balancer, ok := p.balancers[messageType] if !ok { balancer = NewPartitionBalancer(messageType, p.partition, p.chirpConfig) p.balancers[messageType] = balancer } return balancer } func signMessage(msgContext signature.MessageSignatureContext) (string, error) { sharedKey := config.GetSharedKey() secretKey := config.GetSecretKey() organizationName := config.OrganizationName() genratedSignature, err := signature.Generate(sharedKey, secretKey, msgContext, organizationName) if err != nil { return "", fmt.Errorf("failed to sign message. %v", err.Error()) } return genratedSignature, nil } func createOutboxFolders(cfg *config.Config) { logger := fog.New() messageTypes := config.GetRoutes(cfg) partitions := cfg.Partition logger.Info("creating outboxes", "outboxes", messageTypes) for _, outbox := range messageTypes { for partition := 0; partition < partitions; partition++ { pathToFile := fmt.Sprintf("%s/%s/%s", config.GetOutboxPath(cfg), outbox, strconv.Itoa(partition)) err := os.MkdirAll(pathToFile, 0755) if err != nil { logger.Error(err, "failed to create outbox", "outbox pathToFile", pathToFile) } } } logger.Info("created outboxes successfully") }