...
1 package files
2
3 import (
4 "crypto/rand"
5 "fmt"
6 "math/big"
7 "os"
8
9 "edge-infra.dev/pkg/edge/datasync/internal/config"
10 )
11
12 type PartitionBalancer struct {
13 route string
14 messageType string
15 outboxPath string
16 bulkSize int
17 partition int
18 }
19
20 func NewPartitionBalancer(messageType string, partition int, chirpConfig *config.Config) *PartitionBalancer {
21 messageRoute := config.GetRouteConfig(messageType, chirpConfig)
22 route := messageRoute.Route
23 outboxPath := fmt.Sprintf("%s/%s", config.GetOutboxPath(chirpConfig), route)
24
25 return &PartitionBalancer{
26 messageType: messageType,
27 route: route,
28 outboxPath: outboxPath,
29 bulkSize: messageRoute.BulkSize,
30 partition: partition,
31 }
32 }
33
34 type PartitionInfo struct {
35 Path string
36 FilesCount int
37 }
38
39 func (b *PartitionBalancer) getNextPartition() *PartitionInfo {
40 firstPartition := b.getPartitationInfo(0)
41 if firstPartition.FilesCount > 2*b.bulkSize {
42 return b.getPartitionForBalancing()
43 }
44
45 return firstPartition
46 }
47
48 func (b *PartitionBalancer) getPartitationInfo(partition int) *PartitionInfo {
49 partitionPath := fmt.Sprintf("%s/%d", b.outboxPath, partition)
50 files, _ := os.ReadDir(partitionPath)
51
52 return &PartitionInfo{Path: partitionPath, FilesCount: len(files)}
53 }
54
55
56 func (b *PartitionBalancer) getPartitionForBalancing() *PartitionInfo {
57 partitions := b.partition
58
59 randomPartition, err := rand.Int(rand.Reader, big.NewInt(int64(partitions)))
60 if err != nil {
61 fmt.Println("error:", err)
62 }
63
64
65 return getPartitationInfo(b.outboxPath, int(randomPartition.Int64()))
66 }
67
68 func getPartitationInfo(messageTypeOutboxPath string, partition int) *PartitionInfo {
69 partitionPath := fmt.Sprintf("%s/%d", messageTypeOutboxPath, partition)
70 files, _ := os.ReadDir(partitionPath)
71
72 return &PartitionInfo{Path: partitionPath, FilesCount: len(files)}
73 }
74
View as plain text