...
1 package pubsub
2
3 import (
4 "context"
5 "os"
6 "time"
7
8 "cloud.google.com/go/pubsub"
9 "github.com/go-logr/logr"
10
11 "edge-infra.dev/pkg/edge/datasync/shoot/config"
12
13 "edge-infra.dev/pkg/edge/datasync/shoot/model"
14 )
15
16 type PubSub struct {
17 client *pubsub.Client
18 topic *pubsub.Topic
19 cfg *config.Config
20 logger logr.Logger
21 }
22
23
24
25 func NewPubSub(topic string, cfg *config.Config, logger logr.Logger) *PubSub {
26 pubsubClient := createPubSubClient(logger, cfg)
27 topicRef := createTopicReferance(pubsubClient, cfg, topic)
28
29 return &PubSub{
30 client: pubsubClient,
31 topic: topicRef,
32 cfg: cfg,
33 logger: logger,
34 }
35 }
36
37
38 func (p *PubSub) Publish(messages []*model.Message) []*PublishFailureResult {
39 return p.publishMessages(p.topic, messages)
40 }
41
42
43 func createPubSubClient(logger logr.Logger, cfg *config.Config) *pubsub.Client {
44 logger.Info("Creating pub sub client", "projectID", cfg.ForemanProjectID)
45
46 pubSubClient, err := pubsub.NewClient(context.Background(), cfg.ForemanProjectID)
47
48 if err != nil {
49 logger.Error(err, "failed to create pubsub client, will exit")
50 os.Exit(1)
51 }
52
53 return pubSubClient
54 }
55
56 func createTopicReferance(pubsubClient *pubsub.Client, cfg *config.Config, topic string) *pubsub.Topic {
57 t := pubsubClient.Topic(topic)
58
59 t.PublishSettings.CountThreshold = cfg.PubsubBulkSize
60 t.PublishSettings.DelayThreshold = 2000 * time.Millisecond
61 t.PublishSettings.ByteThreshold = cfg.PubsubByteThreshold
62 return t
63 }
64
65 func (p *PubSub) publishMessages(topic *pubsub.Topic, messages []*model.Message) []*PublishFailureResult {
66 ctx := context.Background()
67
68 results := make([]*PublishFailureResult, 0)
69
70 for i := 0; i < len(messages); i++ {
71 message := &pubsub.Message{
72 Data: messages[i].Payload,
73 Attributes: messages[i].Headers,
74 }
75
76 p.injectAttributes(message)
77
78 publishResult := topic.Publish(ctx, message)
79
80 publishFailureResult := &PublishFailureResult{
81 Message: messages[i],
82 Result: publishResult,
83 }
84
85 results = append(results, publishFailureResult)
86 }
87
88 failedMessages := make([]*PublishFailureResult, 0)
89 for _, failure := range results {
90 _, err := failure.Result.Get(ctx)
91 if err != nil {
92 failedMessages = append(failedMessages, failure)
93
94
95 continue
96 }
97
98 }
99
100
101
102
103
104
105 return failedMessages
106 }
107
108
109 func (p *PubSub) injectAttributes(message *pubsub.Message) {
110 if message.Attributes["foreman_gcp_project_id"] == "" {
111 message.Attributes["foreman_gcp_project_id"] = p.cfg.ForemanProjectID
112 }
113 }
114
View as plain text