...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/sender/pubsub-sender.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/sender

     1  package sender
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"sync"
     8  	"sync/atomic"
     9  
    10  	"github.com/go-logr/logr"
    11  
    12  	"edge-infra.dev/pkg/lib/fog"
    13  
    14  	"cloud.google.com/go/pubsub"
    15  	"google.golang.org/api/option"
    16  
    17  	"edge-infra.dev/pkg/edge/datasync/chirp/model"
    18  	"edge-infra.dev/pkg/edge/datasync/chirp/persister"
    19  
    20  	"edge-infra.dev/pkg/edge/datasync/internal/collections"
    21  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    22  )
    23  
    24  type pubsubSender struct {
    25  	client           *pubsub.Client
    26  	messagePersister persister.MessagePersister
    27  	logger           logr.Logger
    28  	chirpConfig      *config.Config
    29  }
    30  
    31  func NewPubsubSender(msgPersister persister.MessagePersister, chirpConfig *config.Config) MessageSender {
    32  	credentialsPath := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
    33  
    34  	logger := fog.New()
    35  
    36  	settings, err := config.GetPubSubSettings()
    37  	if err != nil {
    38  		// logger.Error("invalid PubSub settings. check TARGET_SETTINGS env")
    39  		logger.Info("invalid PubSub settings. check TARGET_SETTINGS env")
    40  	}
    41  
    42  	client, err := pubsub.NewClient(context.Background(), settings.ProjectID, option.WithCredentialsFile(credentialsPath))
    43  	if err != nil {
    44  		// logger.WithFields(map[string]interface{}{
    45  		// 	"source": "pubsub-sender.NewPubSubSender",
    46  		// 	"error":  err.Error(),
    47  		// }).Fatalf("failed to create pubsub sender")
    48  		logger.Error(err, "failed to create pubsub sender")
    49  	}
    50  
    51  	return &pubsubSender{
    52  		client:           client,
    53  		messagePersister: msgPersister,
    54  		logger:           logger,
    55  		chirpConfig:      chirpConfig,
    56  	}
    57  }
    58  
    59  func (s *pubsubSender) Send(messages map[string]model.Message) []model.Message {
    60  	var sentMessages = make([]model.Message, 0)
    61  
    62  	var wg sync.WaitGroup
    63  	var totalErrors uint64
    64  
    65  	availableMessageTypes := config.GetRoutes(s.chirpConfig)
    66  	msgType := ""
    67  	for _, msg := range messages {
    68  		msgType = msg.Type
    69  		break
    70  	}
    71  
    72  	messageTypeWithFallback := msgType
    73  	if !collections.Include(availableMessageTypes, msgType) {
    74  		messageTypeWithFallback = "public"
    75  	}
    76  
    77  	topicID := config.GetTopics(s.chirpConfig)[messageTypeWithFallback]
    78  	t := s.client.Topic(topicID)
    79  
    80  	index := 0
    81  	for _, message := range messages {
    82  		index++
    83  
    84  		result := t.Publish(context.Background(), &pubsub.Message{
    85  			Data: message.Payload,
    86  			Attributes: map[string]string{
    87  				"id":           message.ID.String(),
    88  				"type":         message.Type,
    89  				"organization": message.TenantID,
    90  				"signature":    message.Signature,
    91  				"created":      message.CreatedAt.String(),
    92  			},
    93  		})
    94  
    95  		wg.Add(1)
    96  		go func(msg model.Message, res *pubsub.PublishResult) {
    97  			defer wg.Done()
    98  			// The Get method blocks until a server-generated ID or
    99  			// an error is returned for the published message.
   100  			serverID, err := res.Get(context.Background())
   101  			if err != nil {
   102  				s.logger.Error(err, "failed to publish to pubsub")
   103  				atomic.AddUint64(&totalErrors, 1)
   104  				return
   105  			}
   106  
   107  			sentMessages = append(sentMessages, msg)
   108  			s.logger.Info(fmt.Sprintf("published message to pubsub. server id is %v", serverID))
   109  		}(message, result)
   110  	}
   111  
   112  	wg.Wait()
   113  
   114  	if totalErrors > 0 {
   115  		// s.logger.Error(fmt.Sprintf("%d of %d messages did not publish successfully", totalErrors, len(messages)))
   116  		s.logger.Info(fmt.Sprintf("%d of %d messages did not publish successfully", totalErrors, len(messages)))
   117  	}
   118  
   119  	_ = s.messagePersister.Delete(sentMessages)
   120  
   121  	return sentMessages
   122  }
   123  

View as plain text