1 package chariot
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "net/http"
10 "os"
11 "sync"
12 "time"
13
14 "cloud.google.com/go/pubsub"
15 "cloud.google.com/go/storage"
16 "google.golang.org/api/googleapi"
17
18 helmApi "github.com/fluxcd/helm-controller/api/v2beta1"
19 kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1beta2"
20
21 "edge-infra.dev/pkg/edge/constants"
22 eamodel "edge-infra.dev/pkg/edge/edgeagent/model"
23 "edge-infra.dev/pkg/lib/mqtt"
24 )
25
26 const (
27 sevError = "error"
28 sevWarning = "warning"
29 sevInfo = "info"
30 )
31
32
33 var loggerOutput = struct {
34 sync.Mutex
35 rw io.ReadWriter
36 }{
37 rw: os.Stdout,
38 }
39
40
41 type Option func(*Daemon) error
42
43
44 func OptionGoogleCloudStorage(client *storage.Client) Option {
45 return func(d *Daemon) error {
46 if client == nil {
47 return fmt.Errorf("Google Cloud Storage client must not be nil")
48 }
49 d.storer = NewGoogleCloudStorage(client)
50 return nil
51 }
52 }
53
54
55 func OptionPubSubResponsePublisher(p Publisher) Option {
56 return func(d *Daemon) error {
57 if p == nil {
58 return fmt.Errorf("Publisher must not be nil")
59 }
60 d.response = p
61 return nil
62 }
63 }
64
65
66 func OptionPubSubReceiver(ipsr IPubSubReceiver) Option {
67 return func(d *Daemon) error {
68 if ipsr == nil {
69 return fmt.Errorf("IPubSubReceiver must not be nil")
70 }
71 d.pubSubReceiver = ipsr
72 return nil
73 }
74 }
75
76
77 type Daemon struct {
78 sync.Mutex
79
80 pubSubReceiver IPubSubReceiver
81
82 storer *GoogleCloudStorage
83
84 response Publisher
85
86 metrics *Metrics
87 }
88
89
90 func NewDaemon(options ...Option) (*Daemon, error) {
91 var d = new(Daemon)
92 for _, opt := range options {
93 if err := opt(d); err != nil {
94 return nil, fmt.Errorf("Error setting option: %w", err)
95 }
96 }
97 d.metrics = NewMetrics()
98 return d, nil
99 }
100
101
102 func (d *Daemon) Run(ctx context.Context) error {
103 if d.response == nil {
104 d.response = NopPublisher{}
105 }
106
107 err := d.pubSubReceiver.Receive(ctx, d.pubSubReceiverHandle)
108 return fmt.Errorf("Error returned by Receive function: %w", err)
109 }
110
111
112 func (d *Daemon) pubSubReceiverHandle(ctx context.Context, ipsm IPubSubMessage) {
113 go d.handleRequest(ctx, ipsm)
114 }
115
116
117 func (d *Daemon) performStorageOperation(ctx context.Context, req Request) (si StorageInfo, err error) {
118 var objects []StorageObject
119 if objects, err = req.StorageObjects(); err != nil {
120 return si, fmt.Errorf("Error generating storage objects: %w", err)
121 }
122
123 switch req.Operation {
124 case OperationCreate:
125 return d.storer.Put(ctx, objects...)
126 case OperationDelete:
127 return d.storer.Delete(ctx, objects...)
128 default:
129 const panicmsg = "Requests should have been validated before performStorageOperation is called. Got unknown operation: %q"
130 panic(fmt.Sprintf(panicmsg, req.Operation))
131 }
132 }
133
134
135
136 func (d *Daemon) publishEventNotification(ctx context.Context, req Request, objects ...StorageObject) error {
137 switch req.Operation {
138 case OperationCreate:
139 return d.notify(ctx, req, objects...)
140 case OperationDelete:
141
142 default:
143 return nil
144 }
145 return nil
146 }
147
148 func (d *Daemon) notify(ctx context.Context, req Request, objects ...StorageObject) error {
149
150 notification := eamodel.NewNotificationMessage().
151 SetActor(req.Owner).
152 SetClusterEdgeID(req.Cluster)
153
154
155 syncKustomization := false
156
157 for _, obj := range objects {
158 var event *eamodel.Event
159
160
161
162 obj, err := ParseYamlGVKNN([]byte(obj.Content))
163 if err != nil {
164 return fmt.Errorf("error parsing storage object for notifier: %w", err)
165 }
166 switch obj.Kind {
167 case helmApi.HelmReleaseKind:
168 event = eamodel.BuildEdgeAgentEvent(obj.Metadata.Name, helmApi.HelmReleaseKind, constants.FluxSystem, eamodel.EventTypeReconcile)
169
170 default:
171 if syncKustomization {
172 continue
173 }
174 event = eamodel.BuildEdgeAgentEvent(eamodel.ChariotSync, kustomizeApi.KustomizationKind, constants.FluxEdgeNamespace, eamodel.EventTypeReconcile)
175
176 syncKustomization = true
177 }
178 if event != nil {
179 notification.AddEvent(*event)
180 }
181 }
182
183 if len(notification.Events) > 0 {
184
185 psNotifierClient, err := pubsub.NewClient(ctx, req.Banner)
186 if err != nil {
187 return fmt.Errorf("error creating pubsub client for notifications: %w", err)
188 }
189
190 defer psNotifierClient.Close()
191
192 ipsnotifier, err := mqtt.NewGooglePubSubTopicWrapper(ctx, psNotifierClient, req.Banner, eamodel.EdgeAgentTopicAndOwner)
193 if err != nil {
194 return fmt.Errorf("error fetching pubsub topic for notifications: %w", err)
195 }
196
197 defer ipsnotifier.Stop()
198
199
200 return ipsnotifier.Publish(ctx, notification)
201 }
202 return nil
203 }
204
205
206 func (d *Daemon) handleRequest(ctx context.Context, ipsm IPubSubMessage) {
207 var ackdeadline, cancel = context.WithTimeout(ctx, AckDeadline)
208 defer cancel()
209
210
211 var resp = &chariotResponseMessage{
212 requestID: ipsm.ID(),
213 }
214
215
216 var req Request
217 defer func() {
218 var successful = resp.err == nil
219 d.metrics.IncRequestsTotal(req, successful)
220
221
222 if !successful {
223 d.metrics.IncErrorsTotal(req)
224 if errors.Is(resp.err, context.DeadlineExceeded) {
225 d.metrics.IncDeadlineExceededTotal(AckDeadline)
226 }
227 var gerr *googleapi.Error
228 if errors.As(resp.err, &gerr) {
229 d.metrics.IncGoogleAPIErrorsTotal(resp.err)
230 }
231 }
232 }()
233
234
235 if err := json.Unmarshal(ipsm.Data(), &req); err != nil {
236
237 resp.err = fmt.Errorf("Error decoding request JSON: %w", err)
238 logRequestJSONError(ipsm, resp.err)
239
240
241 if err = d.response.Publish(ackdeadline, resp); err != nil {
242 logResponseError(ipsm.ID(), err)
243 }
244 ipsm.Ack()
245 return
246 }
247
248
249
250 resp.owner = req.Owner
251 resp.operation = req.Operation
252
253 if err := req.Validate(); err != nil {
254 resp.err = fmt.Errorf("Request validation error: %w", err)
255 logRequestJSONError(ipsm, resp.err)
256
257
258 if err = d.response.Publish(ctx, resp); err != nil {
259 logResponseError(ipsm.ID(), err)
260 }
261 ipsm.Ack()
262 return
263 }
264
265
266 var si StorageInfo
267 si, resp.err = d.performStorageOperation(ackdeadline, req)
268
269
270 d.metrics.IncStorageOperationsTotal(req, si)
271 logStorageInfo(ipsm.ID(), si)
272
273 if resp.err != nil {
274 var reason = fmt.Sprintf("Storage operation failed: %v", resp.err)
275 logRetryEvent(ipsm, reason)
276
277
278
279 if errors.Is(resp.err, context.DeadlineExceeded) {
280 logAckDeadlineExpired(ipsm, "Ack deadline reached while performing storage operation")
281 }
282 if IsGoogleAPIError(resp.err) {
283 logGoogleAPIErr(ipsm, resp.err)
284 }
285
286 ipsm.Nack()
287 return
288 }
289
290
291 if err := d.response.Publish(ctx, resp); err != nil {
292 logResponseError(ipsm.ID(), err)
293 }
294 ipsm.Ack()
295
296 if req.Notify {
297
298 if err := d.publishEventNotification(ackdeadline, req, si.ObjectsPut...); err != nil {
299 logNotificationError(resp.requestID, err)
300 }
301 }
302 }
303
304 const LogAckDeadlineExpiredMessage = "Ack deadline expired"
305
306 type LogAckDeadlineExpiredMessageObject struct {
307 Message string `json:"message"`
308 Severity string `json:"severity"`
309
310 RequestID string `json:"pubsub_request_id"`
311 TimeoutDuration string `json:"timeout_duration"`
312
313 Why string `json:"why,omitempty"`
314 }
315
316 func logAckDeadlineExpired(ipsm IPubSubMessage, why string) {
317 var l = LogAckDeadlineExpiredMessageObject{
318 Message: LogAckDeadlineExpiredMessage,
319 Severity: sevError,
320 RequestID: ipsm.ID(),
321 TimeoutDuration: fmt.Sprintf("%v", AckDeadline),
322 Why: why,
323 }
324
325 loggerOutput.Lock()
326 defer loggerOutput.Unlock()
327
328 json.NewEncoder(loggerOutput.rw).Encode(l)
329 }
330
331 type LogRetryEventMessageObject struct {
332 Message string `json:"message"`
333 Severity string `json:"severity"`
334
335 RequestID string `json:"pubsub_request_id"`
336
337 Reason string `json:"retry_reason"`
338 }
339
340 const LogRetryEventMessage = "Retry chariot request"
341
342 func logRetryEvent(ipsm IPubSubMessage, reason string) {
343 var l = LogRetryEventMessageObject{
344 Message: LogRetryEventMessage,
345 Severity: sevInfo,
346 RequestID: ipsm.ID(),
347 Reason: reason,
348 }
349
350 loggerOutput.Lock()
351 defer loggerOutput.Unlock()
352
353 json.NewEncoder(loggerOutput.rw).Encode(l)
354 }
355
356 type LogRequestJSONErrorMessageObject struct {
357 Message string `json:"message"`
358 Severity string `json:"severity"`
359
360 Err string `json:"error"`
361
362 Data []byte `json:"pubsub_data"`
363 RequestID string `json:"pubsub_request_id"`
364 }
365
366 const LogRequestJSONErrorMessage = "Error parsing request JSON"
367
368 func logRequestJSONError(ipsm IPubSubMessage, err error) {
369 var lrjemo = LogRequestJSONErrorMessageObject{
370 Message: LogRequestJSONErrorMessage,
371 Severity: sevError,
372 Err: err.Error(),
373 Data: ipsm.Data(),
374 RequestID: ipsm.ID(),
375 }
376
377 loggerOutput.Lock()
378 defer loggerOutput.Unlock()
379
380 json.NewEncoder(loggerOutput.rw).Encode(lrjemo)
381 }
382
383 const SuccessLogMessage = "Successfully processed pubsub message"
384 const FailureLogMessage = "Failed to process pubsub message"
385
386
387 type PubSubLogMessageObject struct {
388 Severity string `json:"severity"`
389 Message string `json:"message"`
390
391
392 Err string `json:"error,omitempty"`
393
394
395 ID string `json:"pubsub_id"`
396 Data []byte `json:"pubsub_data,omitempty"`
397 Attributes map[string]string `json:"pubsub_attributes,omitempty"`
398 PublishTime time.Time `json:"pubsub_publish_time,omitempty"`
399 DeliveryAttempt int `json:"pubsub_delivery_attempt,omitempty"`
400 OrderingKey string `json:"pubsub_ordering_key,omitempty"`
401 }
402
403 const ResponseErrorLogMessage = "Error publishing pubsub response"
404
405 type ResponseErrorLogMessageObject struct {
406 Severity string `json:"severity"`
407 Message string `json:"message"`
408
409
410 RequestID string `json:"request_pubsub_id"`
411
412 Error string `json:"error"`
413 }
414
415 func logResponseError(reqID string, err error) {
416 loggerOutput.Lock()
417 defer loggerOutput.Unlock()
418
419
420 json.NewEncoder(loggerOutput.rw).Encode(ResponseErrorLogMessageObject{
421 Severity: sevError,
422 Message: ResponseErrorLogMessage,
423 RequestID: reqID,
424 Error: err.Error(),
425 })
426 }
427
428 const LogNotificationErrorMessage = "Error sending notification message"
429
430 type NotificationErrorLogMessageObject struct {
431 Severity string `json:"severity"`
432 Message string `json:"message"`
433
434 RequestID string `json:"request_pubsub_id"`
435 Error string `json:"error"`
436 }
437
438 func logNotificationError(reqID string, err error) {
439 loggerOutput.Lock()
440 defer loggerOutput.Unlock()
441
442
443 json.NewEncoder(loggerOutput.rw).Encode(NotificationErrorLogMessageObject{
444 Severity: sevError,
445 Message: LogNotificationErrorMessage,
446 RequestID: reqID,
447 Error: err.Error(),
448 })
449 }
450
451 const StorageInfoLogMessage = "Storage Info"
452
453 type StorageInfoLogMessageObject struct {
454 Severity string `json:"severity"`
455 Message string `json:"message"`
456
457
458 RequestID string `json:"request_pubsub_id"`
459
460 StorageInfo StorageInfo `json:"storage_info,omitempty"`
461 }
462
463 func logStorageInfo(reqID string, si StorageInfo) {
464 var sev = sevInfo
465 var msg = StorageInfoLogMessage
466
467 if len(si.ObjectsDoNotExist) > 0 {
468 sev = sevWarning
469 }
470 if len(si.Errors) > 0 {
471 sev = sevError
472 } else {
473
474 for i := range si.ObjectsPut {
475 si.ObjectsPut[i].Content = ""
476 }
477 for i := range si.ObjectsDeleted {
478 si.ObjectsDeleted[i].Content = ""
479 }
480 }
481
482 loggerOutput.Lock()
483 defer loggerOutput.Unlock()
484
485 json.NewEncoder(loggerOutput.rw).Encode(StorageInfoLogMessageObject{
486 Severity: sev,
487 Message: msg,
488 RequestID: reqID,
489 StorageInfo: si,
490 })
491 }
492
493 func IsGoogleAPIError(err error) bool {
494 var gerr *googleapi.Error
495 return errors.As(err, &gerr)
496 }
497
498 const LogGoogleAPIErrMessage = "Googleapi Error"
499
500 type LogGoogleAPIErrMessageObject struct {
501 Severity string `json:"severity"`
502 Message string `json:"message"`
503
504 RequestID string `json:"pubsub_request_id"`
505
506 Error string `json:"error"`
507
508 Code int `json:"response_status_code"`
509
510 Header http.Header `json:"response_header"`
511
512 Body string `json:"response_body"`
513
514 Errors []googleapi.ErrorItem `json:"errors"`
515 }
516
517 func logGoogleAPIErr(ipsm IPubSubMessage, err error) {
518 var gerr *googleapi.Error
519 if ok := errors.As(err, &gerr); !ok {
520 return
521 }
522
523 loggerOutput.Lock()
524 defer loggerOutput.Unlock()
525
526 var l = LogGoogleAPIErrMessageObject{
527 Severity: sevError,
528 Message: LogGoogleAPIErrMessage,
529 RequestID: ipsm.ID(),
530 Error: err.Error(),
531 Code: gerr.Code,
532 Header: gerr.Header,
533 Body: gerr.Body,
534 Errors: gerr.Errors,
535 }
536
537
538 json.NewEncoder(loggerOutput.rw).Encode(l)
539 }
540
View as plain text