...

Source file src/edge-infra.dev/pkg/edge/datasync/shoot/pubsub/pubsub_publisher.go

Documentation: edge-infra.dev/pkg/edge/datasync/shoot/pubsub

     1  package pubsub
     2  
     3  import (
     4  	"context"
     5  	"os"
     6  	"time"
     7  
     8  	"cloud.google.com/go/pubsub"
     9  	"github.com/go-logr/logr"
    10  
    11  	"edge-infra.dev/pkg/edge/datasync/shoot/config"
    12  
    13  	"edge-infra.dev/pkg/edge/datasync/shoot/model"
    14  )
    15  
    16  type PubSub struct {
    17  	client *pubsub.Client
    18  	topic  *pubsub.Topic
    19  	cfg    *config.Config
    20  	logger logr.Logger
    21  }
    22  
    23  // NewPubSub creates a new publisher to a Google Pub/Sub topic
    24  // This is a blocking call until publisher can be created.
    25  func NewPubSub(topic string, cfg *config.Config, logger logr.Logger) *PubSub {
    26  	pubsubClient := createPubSubClient(logger, cfg)
    27  	topicRef := createTopicReferance(pubsubClient, cfg, topic)
    28  
    29  	return &PubSub{
    30  		client: pubsubClient,
    31  		topic:  topicRef,
    32  		cfg:    cfg,
    33  		logger: logger,
    34  	}
    35  }
    36  
    37  // Publish publishes messages to a Google Pub/Sub topic. It returns the messages that failed to be published, if any.
    38  func (p *PubSub) Publish(messages []*model.Message) []*PublishFailureResult {
    39  	return p.publishMessages(p.topic, messages)
    40  }
    41  
    42  // TODO - refactor
    43  func createPubSubClient(logger logr.Logger, cfg *config.Config) *pubsub.Client {
    44  	logger.Info("Creating pub sub client", "projectID", cfg.ForemanProjectID)
    45  
    46  	pubSubClient, err := pubsub.NewClient(context.Background(), cfg.ForemanProjectID)
    47  
    48  	if err != nil {
    49  		logger.Error(err, "failed to create pubsub client, will exit")
    50  		os.Exit(1)
    51  	}
    52  
    53  	return pubSubClient
    54  }
    55  
    56  func createTopicReferance(pubsubClient *pubsub.Client, cfg *config.Config, topic string) *pubsub.Topic {
    57  	t := pubsubClient.Topic(topic)
    58  
    59  	t.PublishSettings.CountThreshold = cfg.PubsubBulkSize
    60  	t.PublishSettings.DelayThreshold = 2000 * time.Millisecond
    61  	t.PublishSettings.ByteThreshold = cfg.PubsubByteThreshold
    62  	return t
    63  }
    64  
    65  func (p *PubSub) publishMessages(topic *pubsub.Topic, messages []*model.Message) []*PublishFailureResult {
    66  	ctx := context.Background()
    67  
    68  	results := make([]*PublishFailureResult, 0)
    69  
    70  	for i := 0; i < len(messages); i++ {
    71  		message := &pubsub.Message{
    72  			Data:       messages[i].Payload,
    73  			Attributes: messages[i].Headers,
    74  		}
    75  
    76  		p.injectAttributes(message)
    77  
    78  		publishResult := topic.Publish(ctx, message)
    79  
    80  		publishFailureResult := &PublishFailureResult{
    81  			Message: messages[i],
    82  			Result:  publishResult,
    83  		}
    84  
    85  		results = append(results, publishFailureResult)
    86  	}
    87  
    88  	failedMessages := make([]*PublishFailureResult, 0)
    89  	for _, failure := range results {
    90  		_, err := failure.Result.Get(ctx)
    91  		if err != nil {
    92  			failedMessages = append(failedMessages, failure)
    93  			// logger.Warn(fmt.Sprintf("failed to publish message (%v). message will be handled again soon. (%v)",
    94  			// 	failure.Message.ID.String(), err))
    95  			continue
    96  		}
    97  		// logger.Trace(fmt.Sprintf("succeeded to publish message (%v)", failure.Message.ID.String()))
    98  	}
    99  
   100  	// if len(failedMessages) == 0 {
   101  	// 	// logger.Info(fmt.Sprintf("batch finished successfully. all %d messages were published (%v - %v)",
   102  	// 	// 	len(messages), messages[0].ID.String(), messages[len(messages)-1].ID.String()))
   103  	// }
   104  
   105  	return failedMessages
   106  }
   107  
   108  // injectAttributes injects env variables into the message attributes
   109  func (p *PubSub) injectAttributes(message *pubsub.Message) {
   110  	if message.Attributes["foreman_gcp_project_id"] == "" {
   111  		message.Attributes["foreman_gcp_project_id"] = p.cfg.ForemanProjectID
   112  	}
   113  }
   114  

View as plain text