...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/persister/files/partitionBalancer.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/persister/files

     1  package files
     2  
     3  import (
     4  	"crypto/rand"
     5  	"fmt"
     6  	"math/big"
     7  	"os"
     8  
     9  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    10  )
    11  
    12  type PartitionBalancer struct {
    13  	route       string
    14  	messageType string
    15  	outboxPath  string
    16  	bulkSize    int
    17  	partition   int
    18  }
    19  
    20  func NewPartitionBalancer(messageType string, partition int, chirpConfig *config.Config) *PartitionBalancer {
    21  	messageRoute := config.GetRouteConfig(messageType, chirpConfig)
    22  	route := messageRoute.Route
    23  	outboxPath := fmt.Sprintf("%s/%s", config.GetOutboxPath(chirpConfig), route)
    24  
    25  	return &PartitionBalancer{
    26  		messageType: messageType,
    27  		route:       route,
    28  		outboxPath:  outboxPath,
    29  		bulkSize:    messageRoute.BulkSize,
    30  		partition:   partition,
    31  	}
    32  }
    33  
    34  type PartitionInfo struct {
    35  	Path       string
    36  	FilesCount int
    37  }
    38  
    39  func (b *PartitionBalancer) getNextPartition() *PartitionInfo {
    40  	firstPartition := b.getPartitationInfo(0)
    41  	if firstPartition.FilesCount > 2*b.bulkSize {
    42  		return b.getPartitionForBalancing()
    43  	}
    44  
    45  	return firstPartition
    46  }
    47  
    48  func (b *PartitionBalancer) getPartitationInfo(partition int) *PartitionInfo {
    49  	partitionPath := fmt.Sprintf("%s/%d", b.outboxPath, partition)
    50  	files, _ := os.ReadDir(partitionPath)
    51  
    52  	return &PartitionInfo{Path: partitionPath, FilesCount: len(files)}
    53  }
    54  
    55  // randomly returns a partition for balancing
    56  func (b *PartitionBalancer) getPartitionForBalancing() *PartitionInfo {
    57  	partitions := b.partition
    58  
    59  	randomPartition, err := rand.Int(rand.Reader, big.NewInt(int64(partitions)))
    60  	if err != nil {
    61  		fmt.Println("error:", err)
    62  	}
    63  	// fmt.Printf("random number: %d\n", n.Int64())
    64  
    65  	return getPartitationInfo(b.outboxPath, int(randomPartition.Int64()))
    66  }
    67  
    68  func getPartitationInfo(messageTypeOutboxPath string, partition int) *PartitionInfo {
    69  	partitionPath := fmt.Sprintf("%s/%d", messageTypeOutboxPath, partition)
    70  	files, _ := os.ReadDir(partitionPath)
    71  
    72  	return &PartitionInfo{Path: partitionPath, FilesCount: len(files)}
    73  }
    74  

View as plain text