package files import ( "crypto/rand" "fmt" "math/big" "os" "edge-infra.dev/pkg/edge/datasync/internal/config" ) type PartitionBalancer struct { route string messageType string outboxPath string bulkSize int partition int } func NewPartitionBalancer(messageType string, partition int, chirpConfig *config.Config) *PartitionBalancer { messageRoute := config.GetRouteConfig(messageType, chirpConfig) route := messageRoute.Route outboxPath := fmt.Sprintf("%s/%s", config.GetOutboxPath(chirpConfig), route) return &PartitionBalancer{ messageType: messageType, route: route, outboxPath: outboxPath, bulkSize: messageRoute.BulkSize, partition: partition, } } type PartitionInfo struct { Path string FilesCount int } func (b *PartitionBalancer) getNextPartition() *PartitionInfo { firstPartition := b.getPartitationInfo(0) if firstPartition.FilesCount > 2*b.bulkSize { return b.getPartitionForBalancing() } return firstPartition } func (b *PartitionBalancer) getPartitationInfo(partition int) *PartitionInfo { partitionPath := fmt.Sprintf("%s/%d", b.outboxPath, partition) files, _ := os.ReadDir(partitionPath) return &PartitionInfo{Path: partitionPath, FilesCount: len(files)} } // randomly returns a partition for balancing func (b *PartitionBalancer) getPartitionForBalancing() *PartitionInfo { partitions := b.partition randomPartition, err := rand.Int(rand.Reader, big.NewInt(int64(partitions))) if err != nil { fmt.Println("error:", err) } // fmt.Printf("random number: %d\n", n.Int64()) return getPartitationInfo(b.outboxPath, int(randomPartition.Int64())) } func getPartitationInfo(messageTypeOutboxPath string, partition int) *PartitionInfo { partitionPath := fmt.Sprintf("%s/%d", messageTypeOutboxPath, partition) files, _ := os.ReadDir(partitionPath) return &PartitionInfo{Path: partitionPath, FilesCount: len(files)} }