package pubsub import ( "context" "os" "time" "cloud.google.com/go/pubsub" "github.com/go-logr/logr" "edge-infra.dev/pkg/edge/datasync/shoot/config" "edge-infra.dev/pkg/edge/datasync/shoot/model" ) type PubSub struct { client *pubsub.Client topic *pubsub.Topic cfg *config.Config logger logr.Logger } // NewPubSub creates a new publisher to a Google Pub/Sub topic // This is a blocking call until publisher can be created. func NewPubSub(topic string, cfg *config.Config, logger logr.Logger) *PubSub { pubsubClient := createPubSubClient(logger, cfg) topicRef := createTopicReferance(pubsubClient, cfg, topic) return &PubSub{ client: pubsubClient, topic: topicRef, cfg: cfg, logger: logger, } } // Publish publishes messages to a Google Pub/Sub topic. It returns the messages that failed to be published, if any. func (p *PubSub) Publish(messages []*model.Message) []*PublishFailureResult { return p.publishMessages(p.topic, messages) } // TODO - refactor func createPubSubClient(logger logr.Logger, cfg *config.Config) *pubsub.Client { logger.Info("Creating pub sub client", "projectID", cfg.ForemanProjectID) pubSubClient, err := pubsub.NewClient(context.Background(), cfg.ForemanProjectID) if err != nil { logger.Error(err, "failed to create pubsub client, will exit") os.Exit(1) } return pubSubClient } func createTopicReferance(pubsubClient *pubsub.Client, cfg *config.Config, topic string) *pubsub.Topic { t := pubsubClient.Topic(topic) t.PublishSettings.CountThreshold = cfg.PubsubBulkSize t.PublishSettings.DelayThreshold = 2000 * time.Millisecond t.PublishSettings.ByteThreshold = cfg.PubsubByteThreshold return t } func (p *PubSub) publishMessages(topic *pubsub.Topic, messages []*model.Message) []*PublishFailureResult { ctx := context.Background() results := make([]*PublishFailureResult, 0) for i := 0; i < len(messages); i++ { message := &pubsub.Message{ Data: messages[i].Payload, Attributes: messages[i].Headers, } p.injectAttributes(message) publishResult := topic.Publish(ctx, message) publishFailureResult := &PublishFailureResult{ Message: messages[i], Result: publishResult, } results = append(results, publishFailureResult) } failedMessages := make([]*PublishFailureResult, 0) for _, failure := range results { _, err := failure.Result.Get(ctx) if err != nil { failedMessages = append(failedMessages, failure) // logger.Warn(fmt.Sprintf("failed to publish message (%v). message will be handled again soon. (%v)", // failure.Message.ID.String(), err)) continue } // logger.Trace(fmt.Sprintf("succeeded to publish message (%v)", failure.Message.ID.String())) } // if len(failedMessages) == 0 { // // logger.Info(fmt.Sprintf("batch finished successfully. all %d messages were published (%v - %v)", // // len(messages), messages[0].ID.String(), messages[len(messages)-1].ID.String())) // } return failedMessages } // injectAttributes injects env variables into the message attributes func (p *PubSub) injectAttributes(message *pubsub.Message) { if message.Attributes["foreman_gcp_project_id"] == "" { message.Attributes["foreman_gcp_project_id"] = p.cfg.ForemanProjectID } }