...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/provider/fileSystem-provider.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/provider

     1  package provider
     2  
     3  import (
     4  	"edge-infra.dev/pkg/edge/datasync/chirp/model"
     5  
     6  	"edge-infra.dev/pkg/edge/datasync/internal/config"
     7  	repository "edge-infra.dev/pkg/edge/datasync/internal/outbox"
     8  )
     9  
    10  type fileSystemProvider struct {
    11  	partition         int
    12  	outboxes          []string
    13  	bulkSizePerOutbox map[string]int
    14  	configChirp       *config.Config
    15  }
    16  
    17  func NewFileSystemProvider(partition int, chirpConfig *config.Config) MessageProvider {
    18  	return &fileSystemProvider{
    19  		partition:         partition,
    20  		outboxes:          config.GetRoutes(chirpConfig),
    21  		bulkSizePerOutbox: config.GetBulkSizes(chirpConfig),
    22  		configChirp:       chirpConfig,
    23  	}
    24  }
    25  
    26  func (p *fileSystemProvider) GetMessages() map[string]model.Message {
    27  	for _, outbox := range p.outboxes {
    28  		var fileRepository = repository.NewFileRepository(config.GetOutboxPath(p.configChirp), outbox, p.partition)
    29  		bulkSize := p.bulkSizePerOutbox[outbox]
    30  		messages := fileRepository.GetMessages(bulkSize)
    31  
    32  		if len(messages) > 0 {
    33  			return messages
    34  		}
    35  	}
    36  
    37  	return nil
    38  }
    39  

View as plain text