...
1 package sender
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "sync"
8 "sync/atomic"
9
10 "github.com/go-logr/logr"
11
12 "edge-infra.dev/pkg/lib/fog"
13
14 "cloud.google.com/go/pubsub"
15 "google.golang.org/api/option"
16
17 "edge-infra.dev/pkg/edge/datasync/chirp/model"
18 "edge-infra.dev/pkg/edge/datasync/chirp/persister"
19
20 "edge-infra.dev/pkg/edge/datasync/internal/collections"
21 "edge-infra.dev/pkg/edge/datasync/internal/config"
22 )
23
24 type pubsubSender struct {
25 client *pubsub.Client
26 messagePersister persister.MessagePersister
27 logger logr.Logger
28 chirpConfig *config.Config
29 }
30
31 func NewPubsubSender(msgPersister persister.MessagePersister, chirpConfig *config.Config) MessageSender {
32 credentialsPath := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
33
34 logger := fog.New()
35
36 settings, err := config.GetPubSubSettings()
37 if err != nil {
38
39 logger.Info("invalid PubSub settings. check TARGET_SETTINGS env")
40 }
41
42 client, err := pubsub.NewClient(context.Background(), settings.ProjectID, option.WithCredentialsFile(credentialsPath))
43 if err != nil {
44
45
46
47
48 logger.Error(err, "failed to create pubsub sender")
49 }
50
51 return &pubsubSender{
52 client: client,
53 messagePersister: msgPersister,
54 logger: logger,
55 chirpConfig: chirpConfig,
56 }
57 }
58
59 func (s *pubsubSender) Send(messages map[string]model.Message) []model.Message {
60 var sentMessages = make([]model.Message, 0)
61
62 var wg sync.WaitGroup
63 var totalErrors uint64
64
65 availableMessageTypes := config.GetRoutes(s.chirpConfig)
66 msgType := ""
67 for _, msg := range messages {
68 msgType = msg.Type
69 break
70 }
71
72 messageTypeWithFallback := msgType
73 if !collections.Include(availableMessageTypes, msgType) {
74 messageTypeWithFallback = "public"
75 }
76
77 topicID := config.GetTopics(s.chirpConfig)[messageTypeWithFallback]
78 t := s.client.Topic(topicID)
79
80 index := 0
81 for _, message := range messages {
82 index++
83
84 result := t.Publish(context.Background(), &pubsub.Message{
85 Data: message.Payload,
86 Attributes: map[string]string{
87 "id": message.ID.String(),
88 "type": message.Type,
89 "organization": message.TenantID,
90 "signature": message.Signature,
91 "created": message.CreatedAt.String(),
92 },
93 })
94
95 wg.Add(1)
96 go func(msg model.Message, res *pubsub.PublishResult) {
97 defer wg.Done()
98
99
100 serverID, err := res.Get(context.Background())
101 if err != nil {
102 s.logger.Error(err, "failed to publish to pubsub")
103 atomic.AddUint64(&totalErrors, 1)
104 return
105 }
106
107 sentMessages = append(sentMessages, msg)
108 s.logger.Info(fmt.Sprintf("published message to pubsub. server id is %v", serverID))
109 }(message, result)
110 }
111
112 wg.Wait()
113
114 if totalErrors > 0 {
115
116 s.logger.Info(fmt.Sprintf("%d of %d messages did not publish successfully", totalErrors, len(messages)))
117 }
118
119 _ = s.messagePersister.Delete(sentMessages)
120
121 return sentMessages
122 }
123
View as plain text