...
1 package sender
2
3 import (
4 "os"
5
6 "edge-infra.dev/pkg/edge/datasync/internal/config"
7
8 "github.com/go-logr/logr"
9
10 "edge-infra.dev/pkg/edge/datasync/chirp/persister"
11 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
12 "edge-infra.dev/pkg/lib/fog"
13 )
14
15 type Factory struct {
16 msgPersister persister.MessagePersister
17 logger logr.Logger
18 }
19
20 func NewSenderFactory(msgPersister persister.MessagePersister) *Factory {
21 return &Factory{
22 msgPersister: msgPersister,
23 logger: fog.New(),
24 }
25 }
26
27 func (f *Factory) GetInstance(cfg *kafkaclient.Config, chirpConfig *config.Config) (MessageSender, error) {
28 target := os.Getenv("TARGET")
29
30 if target == "pubsub" {
31 f.logger.Info("creating PubSub sender")
32 return NewPubsubSender(f.msgPersister, chirpConfig), nil
33 }
34
35 f.logger.Info("creating Kafka sender based on file system persister")
36 kafkaProducer, err := kafkaclient.NewProducer(cfg)
37 if err != nil {
38 return nil, err
39 }
40 return NewKafkaSender(kafkaProducer, f.msgPersister, chirpConfig), nil
41 }
42
View as plain text