package sender import ( "os" "edge-infra.dev/pkg/edge/datasync/internal/config" "github.com/go-logr/logr" "edge-infra.dev/pkg/edge/datasync/chirp/persister" "edge-infra.dev/pkg/edge/datasync/kafkaclient" "edge-infra.dev/pkg/lib/fog" ) type Factory struct { msgPersister persister.MessagePersister logger logr.Logger } func NewSenderFactory(msgPersister persister.MessagePersister) *Factory { return &Factory{ msgPersister: msgPersister, logger: fog.New(), } } func (f *Factory) GetInstance(cfg *kafkaclient.Config, chirpConfig *config.Config) (MessageSender, error) { target := os.Getenv("TARGET") if target == "pubsub" { f.logger.Info("creating PubSub sender") return NewPubsubSender(f.msgPersister, chirpConfig), nil } f.logger.Info("creating Kafka sender based on file system persister") kafkaProducer, err := kafkaclient.NewProducer(cfg) if err != nil { return nil, err } return NewKafkaSender(kafkaProducer, f.msgPersister, chirpConfig), nil }