package sender import ( "context" "fmt" "os" "sync" "sync/atomic" "github.com/go-logr/logr" "edge-infra.dev/pkg/lib/fog" "cloud.google.com/go/pubsub" "google.golang.org/api/option" "edge-infra.dev/pkg/edge/datasync/chirp/model" "edge-infra.dev/pkg/edge/datasync/chirp/persister" "edge-infra.dev/pkg/edge/datasync/internal/collections" "edge-infra.dev/pkg/edge/datasync/internal/config" ) type pubsubSender struct { client *pubsub.Client messagePersister persister.MessagePersister logger logr.Logger chirpConfig *config.Config } func NewPubsubSender(msgPersister persister.MessagePersister, chirpConfig *config.Config) MessageSender { credentialsPath := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") logger := fog.New() settings, err := config.GetPubSubSettings() if err != nil { // logger.Error("invalid PubSub settings. check TARGET_SETTINGS env") logger.Info("invalid PubSub settings. check TARGET_SETTINGS env") } client, err := pubsub.NewClient(context.Background(), settings.ProjectID, option.WithCredentialsFile(credentialsPath)) if err != nil { // logger.WithFields(map[string]interface{}{ // "source": "pubsub-sender.NewPubSubSender", // "error": err.Error(), // }).Fatalf("failed to create pubsub sender") logger.Error(err, "failed to create pubsub sender") } return &pubsubSender{ client: client, messagePersister: msgPersister, logger: logger, chirpConfig: chirpConfig, } } func (s *pubsubSender) Send(messages map[string]model.Message) []model.Message { var sentMessages = make([]model.Message, 0) var wg sync.WaitGroup var totalErrors uint64 availableMessageTypes := config.GetRoutes(s.chirpConfig) msgType := "" for _, msg := range messages { msgType = msg.Type break } messageTypeWithFallback := msgType if !collections.Include(availableMessageTypes, msgType) { messageTypeWithFallback = "public" } topicID := config.GetTopics(s.chirpConfig)[messageTypeWithFallback] t := s.client.Topic(topicID) index := 0 for _, message := range messages { index++ result := t.Publish(context.Background(), &pubsub.Message{ Data: message.Payload, Attributes: map[string]string{ "id": message.ID.String(), "type": message.Type, "organization": message.TenantID, "signature": message.Signature, "created": message.CreatedAt.String(), }, }) wg.Add(1) go func(msg model.Message, res *pubsub.PublishResult) { defer wg.Done() // The Get method blocks until a server-generated ID or // an error is returned for the published message. serverID, err := res.Get(context.Background()) if err != nil { s.logger.Error(err, "failed to publish to pubsub") atomic.AddUint64(&totalErrors, 1) return } sentMessages = append(sentMessages, msg) s.logger.Info(fmt.Sprintf("published message to pubsub. server id is %v", serverID)) }(message, result) } wg.Wait() if totalErrors > 0 { // s.logger.Error(fmt.Sprintf("%d of %d messages did not publish successfully", totalErrors, len(messages))) s.logger.Info(fmt.Sprintf("%d of %d messages did not publish successfully", totalErrors, len(messages))) } _ = s.messagePersister.Delete(sentMessages) return sentMessages }