1 package msgsvc
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/go-logr/logr"
10
11 "edge-infra.dev/pkg/lib/fog"
12 "edge-infra.dev/pkg/sds/emergencyaccess/eaconst"
13 "edge-infra.dev/pkg/sds/emergencyaccess/msgdata"
14 )
15
16 type topicEntry struct {
17 topicID string
18 projectID string
19 }
20
21
22
23
24 type MessageService struct {
25 ps clientItfc
26 topicCache map[topicEntry]topicItfc
27 logger logr.Logger
28
29 topicMu sync.Mutex
30 }
31
32
33
34 func NewMessageService(ctx context.Context) (*MessageService, error) {
35
36
37 projectID := "ts"
38 client, err := newClient(ctx, projectID)
39 if err != nil {
40 return nil, err
41 }
42 logr := fog.FromContext(ctx)
43 msgSvc := MessageService{
44 ps: client,
45 topicCache: make(map[topicEntry]topicItfc),
46 logger: logr.WithName("msgsvc"),
47 topicMu: sync.Mutex{},
48 }
49
50 return &msgSvc, nil
51 }
52
53
54
55 func (ms *MessageService) CreateSubscription(ctx context.Context, sessionID string, subscriptionID string, projectID string, topicID string) error {
56 cfg := subscriptionCfg{
57 topicName: topicID,
58 projectID: projectID,
59
60 retentionDuration: 1 * time.Hour,
61 expirationPolicy: 24 * time.Hour,
62
63 filter: fmt.Sprintf(`attributes.sessionId="%s"`, sessionID),
64 }
65
66 _, err := ms.ps.CreateSubscription(ctx, subscriptionID, cfg)
67 if err != nil {
68 return fmt.Errorf("error creating subscription: %w", err)
69 }
70 return nil
71 }
72
73 func (ms *MessageService) DeleteSubscription(ctx context.Context, subscriptionID string, projectID string) error {
74 sub := ms.ps.SubscriptionInProject(subscriptionID, projectID)
75 return sub.Delete(ctx)
76 }
77
78 func createCommandResponse(msg messageItfc) (msgdata.CommandResponse, error) {
79 return msgdata.NewCommandResponse(msg.Data(), msg.Attributes())
80 }
81
82
83
84
85
86
87 func (ms *MessageService) Subscribe(ctx context.Context, subscriptionID string, projectID string, handler func(ctx context.Context, msg msgdata.CommandResponse), filter map[string]string) error {
88 return ms.ps.SubscriptionInProject(subscriptionID, projectID).Receive(ctx, func(ctx context.Context, msg messageItfc) {
89 log := fog.FromContext(ctx)
90 if !isFilterMatch(msg.Attributes(), filter) {
91 msg.Nack()
92 return
93 }
94
95 cResp, err := createCommandResponse(msg)
96 if err != nil {
97 log.Error(err, "Error parsing message", "messageID", msg.ID())
98 msg.Nack()
99 return
100 }
101
102 log = log.WithValues("sessionID", cResp.Attributes().SessionID, "requestMessageID", cResp.Attributes().ReqMsgID, "messageID", msg.ID())
103 ctx = fog.IntoContext(ctx, log)
104 handler(ctx, cResp)
105 log.Info("Acknowledging message", "messageID", msg.ID())
106 msg.Ack()
107 })
108 }
109
110 func isFilterMatch(attr, filter map[string]string) bool {
111 for k, v := range filter {
112 attrVal, ok := attr[k]
113 if attrVal != v || !ok {
114 return false
115 }
116 }
117 return true
118 }
119
120 func createPubsubMessage(msg msgdata.Request) (messageItfc, error) {
121 data, err := msg.Data()
122 if err != nil {
123 return nil, err
124 }
125 resp := newMessage(data, msg.Attributes())
126 return resp, nil
127 }
128
129
130
131
132
133 func (ms *MessageService) Publish(ctx context.Context, topic string, projectID string, message msgdata.Request) error {
134 log := fog.FromContext(ctx, "commandID", message.Attributes()[eaconst.CommandIDKey], "sessionID", message.Attributes()[eaconst.SessionIDKey])
135 ctx = fog.IntoContext(ctx, log)
136 log.Info("Received message to publish")
137 msg, err := createPubsubMessage(message)
138 if err != nil {
139 return err
140 }
141
142 msg.SetOrderingKey(message.Attributes()[eaconst.CommandIDKey])
143
144 top := getTopic(log, &ms.topicMu, topic, projectID, ms.topicCache, ms.ps)
145
146 log.Info("Publishing message", "topicID", top.ID())
147 res := top.Publish(ctx, msg)
148 _, err = res.Get(ctx)
149
150 return err
151 }
152
153 func getTopic(log logr.Logger, lock *sync.Mutex, topic string, projectID string, topicCache map[topicEntry]topicItfc, client topicInProjecter) topicItfc {
154 cacheEntry := topicEntry{
155 topicID: topic,
156 projectID: projectID,
157 }
158
159 lock.Lock()
160
161 top, ok := topicCache[cacheEntry]
162
163 if !ok {
164 log.Info("Topic not in cache, creating new topic", "topic", topic, "projectID", projectID)
165 top = client.TopicInProject(topic, projectID)
166 top.SetOrdering(true)
167 } else {
168 log.Info("Topic discovered in cache, reusing.", "topic", topic, "projectID", projectID)
169 }
170
171 topicCache[cacheEntry] = top
172
173 lock.Unlock()
174
175 return top
176 }
177
178
179
180
181 func (ms *MessageService) StopPublish(topic string, projectID string) {
182 entry := topicEntry{
183 topicID: topic,
184 projectID: projectID,
185 }
186
187 ms.topicMu.Lock()
188 top, ok := ms.topicCache[entry]
189
190 if !ok {
191 ms.logger.Info("Topic not in cache, nothing to do.", "topic", topic, "projectID", projectID)
192 ms.topicMu.Unlock()
193 return
194 }
195
196 ms.logger.Info("Topic discovered. Removing from cache and stopping topic", "topic", topic, "projectID", projectID)
197 delete(ms.topicCache, entry)
198
199 ms.topicMu.Unlock()
200
201 top.Stop()
202 }
203
View as plain text