...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/persister/files/file-persister.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/persister/files

     1  package files
     2  
     3  import (
     4  	"fmt"
     5  	"os"
     6  	"strconv"
     7  	"strings"
     8  	"time"
     9  
    10  	"github.com/go-logr/logr"
    11  
    12  	"edge-infra.dev/pkg/lib/fog"
    13  
    14  	"edge-infra.dev/pkg/edge/datasync/chirp/model"
    15  	"edge-infra.dev/pkg/edge/datasync/chirp/persister"
    16  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    17  	"edge-infra.dev/pkg/edge/datasync/internal/signature"
    18  )
    19  
    20  type filePersister struct {
    21  	balancers   map[string]*PartitionBalancer
    22  	logger      logr.Logger
    23  	partition   int
    24  	chirpConfig *config.Config
    25  }
    26  
    27  func NewFilePersister(cfg *config.Config) (persister.MessagePersister, error) {
    28  	createOutboxFolders(cfg)
    29  	// if err != nil {
    30  	// 	return nil, err
    31  	// }
    32  
    33  	partitionBalancers := make(map[string]*PartitionBalancer)
    34  
    35  	return &filePersister{
    36  		balancers:   partitionBalancers,
    37  		logger:      fog.New(),
    38  		partition:   cfg.Partition,
    39  		chirpConfig: cfg,
    40  	}, nil
    41  }
    42  
    43  func (p *filePersister) Save(message model.MessageData) error {
    44  	balancer := p.getOrCreatePartitionBalancer(message.Type)
    45  	partitionInfo := balancer.getNextPartition()
    46  
    47  	fileContent, err := createMessageFile(message, partitionInfo.Path, p.chirpConfig)
    48  	if err != nil {
    49  		return fmt.Errorf("failed to create message file. %v", err.Error())
    50  	}
    51  
    52  	var genratedSignature string
    53  	creationTimeStamp := time.Now().UTC()
    54  
    55  	shouldSignMessage := config.GetRouteConfig(message.Type, p.chirpConfig).Signing
    56  
    57  	if shouldSignMessage {
    58  		msgContext := signature.MessageSignatureContext{
    59  			MessageID:   message.ID.String(),
    60  			MessageType: message.Type,
    61  			Payload:     fileContent,
    62  			Timestamp:   creationTimeStamp,
    63  		}
    64  
    65  		genratedSignature, err = signMessage(msgContext)
    66  
    67  		if err != nil {
    68  			p.logger.Error(err, "failed to sign message. message will not be persisted")
    69  			// p.logger.WithFields(map[string]interface{}{
    70  			// 	"source":      "file-persister.SaveMessage",
    71  			// 	"messageType": msgContext.MessageType,
    72  			// 	"messageId":   msgContext.MessageId,
    73  			// }).Error("failed to sign message. message will not be persisted")
    74  
    75  			return fmt.Errorf("failed to sign message. message will not be persisted (type: %v, id: %v). %v",
    76  				msgContext.MessageType, msgContext.MessageID, err.Error())
    77  		}
    78  	}
    79  
    80  	metadata := &model.MessageMetadata{
    81  		MessageType:      message.Type,
    82  		CreatedAt:        creationTimeStamp,
    83  		TenantID:         config.OrganizationName(),
    84  		OrganizationName: config.OrganizationName(),
    85  		OrganizationID:   config.OrganizationID(),
    86  		SiteID:           config.SiteID(),
    87  		SiteName:         config.SiteName(),
    88  		Signature:        genratedSignature,
    89  	}
    90  
    91  	errMetadata := createMetadateFile(message.ID, metadata, partitionInfo.Path)
    92  	if errMetadata != nil {
    93  		return fmt.Errorf("failed to create message metadata file. %v", err.Error())
    94  	}
    95  
    96  	p.logger.Info("successfully created message files")
    97  
    98  	return nil
    99  }
   100  
   101  func (p *filePersister) Delete(messages []model.Message) error {
   102  	var failureCount int
   103  
   104  	for _, message := range messages {
   105  		err := deleteMessageFiles(message.FilePath)
   106  
   107  		if err != nil {
   108  			p.logger.Error(err, "failed to delete message files")
   109  			failureCount++
   110  		}
   111  	}
   112  
   113  	if failureCount > 0 {
   114  		//TODO: what happens when message was sent but failed to be deleted
   115  		// currently it will be sent again
   116  		err := fmt.Errorf("failed to delete %d messages", failureCount)
   117  		p.logger.Error(err, "failed to delete message files")
   118  		return err
   119  	}
   120  
   121  	return nil
   122  }
   123  
   124  func deleteMessageFiles(pathToDsmFile string) error {
   125  	err := os.Remove(pathToDsmFile)
   126  	if err != nil {
   127  		return fmt.Errorf("failed to delete %v", pathToDsmFile)
   128  	}
   129  
   130  	pathToMsgFile := strings.ReplaceAll(pathToDsmFile, ".dsm", ".msg")
   131  	err = os.Remove(pathToMsgFile)
   132  	if err != nil {
   133  		return fmt.Errorf("failed to delete %v", pathToMsgFile)
   134  	}
   135  
   136  	return nil
   137  }
   138  
   139  func (p *filePersister) getOrCreatePartitionBalancer(messageType string) *PartitionBalancer {
   140  	balancer, ok := p.balancers[messageType]
   141  	if !ok {
   142  		balancer = NewPartitionBalancer(messageType, p.partition, p.chirpConfig)
   143  		p.balancers[messageType] = balancer
   144  	}
   145  
   146  	return balancer
   147  }
   148  
   149  func signMessage(msgContext signature.MessageSignatureContext) (string, error) {
   150  	sharedKey := config.GetSharedKey()
   151  	secretKey := config.GetSecretKey()
   152  	organizationName := config.OrganizationName()
   153  
   154  	genratedSignature, err := signature.Generate(sharedKey, secretKey, msgContext, organizationName)
   155  	if err != nil {
   156  		return "", fmt.Errorf("failed to sign message. %v", err.Error())
   157  	}
   158  
   159  	return genratedSignature, nil
   160  }
   161  
   162  func createOutboxFolders(cfg *config.Config) {
   163  	logger := fog.New()
   164  
   165  	messageTypes := config.GetRoutes(cfg)
   166  	partitions := cfg.Partition
   167  
   168  	logger.Info("creating outboxes", "outboxes", messageTypes)
   169  
   170  	for _, outbox := range messageTypes {
   171  		for partition := 0; partition < partitions; partition++ {
   172  			pathToFile := fmt.Sprintf("%s/%s/%s", config.GetOutboxPath(cfg), outbox, strconv.Itoa(partition))
   173  			err := os.MkdirAll(pathToFile, 0755)
   174  
   175  			if err != nil {
   176  				logger.Error(err, "failed to create outbox", "outbox pathToFile", pathToFile)
   177  			}
   178  		}
   179  	}
   180  
   181  	logger.Info("created outboxes successfully")
   182  }
   183  

View as plain text