...
1 package provider
2
3 import (
4 "edge-infra.dev/pkg/edge/datasync/chirp/model"
5
6 "edge-infra.dev/pkg/edge/datasync/internal/config"
7 repository "edge-infra.dev/pkg/edge/datasync/internal/outbox"
8 )
9
10 type fileSystemProvider struct {
11 partition int
12 outboxes []string
13 bulkSizePerOutbox map[string]int
14 configChirp *config.Config
15 }
16
17 func NewFileSystemProvider(partition int, chirpConfig *config.Config) MessageProvider {
18 return &fileSystemProvider{
19 partition: partition,
20 outboxes: config.GetRoutes(chirpConfig),
21 bulkSizePerOutbox: config.GetBulkSizes(chirpConfig),
22 configChirp: chirpConfig,
23 }
24 }
25
26 func (p *fileSystemProvider) GetMessages() map[string]model.Message {
27 for _, outbox := range p.outboxes {
28 var fileRepository = repository.NewFileRepository(config.GetOutboxPath(p.configChirp), outbox, p.partition)
29 bulkSize := p.bulkSizePerOutbox[outbox]
30 messages := fileRepository.GetMessages(bulkSize)
31
32 if len(messages) > 0 {
33 return messages
34 }
35 }
36
37 return nil
38 }
39
View as plain text