...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/sender/senderFactory.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/sender

     1  package sender
     2  
     3  import (
     4  	"os"
     5  
     6  	"edge-infra.dev/pkg/edge/datasync/internal/config"
     7  
     8  	"github.com/go-logr/logr"
     9  
    10  	"edge-infra.dev/pkg/edge/datasync/chirp/persister"
    11  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
    12  	"edge-infra.dev/pkg/lib/fog"
    13  )
    14  
    15  type Factory struct {
    16  	msgPersister persister.MessagePersister
    17  	logger       logr.Logger
    18  }
    19  
    20  func NewSenderFactory(msgPersister persister.MessagePersister) *Factory {
    21  	return &Factory{
    22  		msgPersister: msgPersister,
    23  		logger:       fog.New(),
    24  	}
    25  }
    26  
    27  func (f *Factory) GetInstance(cfg *kafkaclient.Config, chirpConfig *config.Config) (MessageSender, error) {
    28  	target := os.Getenv("TARGET")
    29  
    30  	if target == "pubsub" {
    31  		f.logger.Info("creating PubSub sender")
    32  		return NewPubsubSender(f.msgPersister, chirpConfig), nil
    33  	}
    34  
    35  	f.logger.Info("creating Kafka sender based on file system persister")
    36  	kafkaProducer, err := kafkaclient.NewProducer(cfg)
    37  	if err != nil {
    38  		return nil, err
    39  	}
    40  	return NewKafkaSender(kafkaProducer, f.msgPersister, chirpConfig), nil
    41  }
    42  

View as plain text