...
1 package worker
2
3 import (
4 "fmt"
5 "time"
6
7 "github.com/go-logr/logr"
8
9 "edge-infra.dev/pkg/lib/fog"
10
11 "edge-infra.dev/pkg/edge/datasync/chirp/provider"
12 "edge-infra.dev/pkg/edge/datasync/chirp/sender"
13
14 "edge-infra.dev/pkg/edge/datasync/internal/config"
15 )
16
17 type messageWorker struct {
18 partition int
19 messageProvider provider.MessageProvider
20 messageSender sender.MessageSender
21 logger logr.Logger
22 config *config.Config
23 }
24
25 func NewMessageWorker(partition int, msgProvider provider.MessageProvider, msgSender sender.MessageSender, cfg *config.Config) Worker {
26 return &messageWorker{
27 partition: partition,
28 messageProvider: msgProvider,
29 messageSender: msgSender,
30 logger: fog.New(),
31 config: cfg,
32 }
33 }
34
35 func (w *messageWorker) DoWork() {
36 shouldRun := w.config.RunMessageWorker
37 delayDuration := time.Duration(w.config.WorkerPolling) * time.Second
38
39 for shouldRun {
40 messages := w.messageProvider.GetMessages()
41 if len(messages) > 0 {
42 sentMessages := w.messageSender.Send(messages)
43 if len(sentMessages) > 0 {
44 for _, msg := range sentMessages {
45 message := fmt.Sprintf("successfully sent message ID: %s", msg.ID)
46 w.logger.Info(message)
47 }
48
49
50
51
52
53 }
54 } else {
55
56
57
58
59
60
61 time.Sleep(delayDuration)
62 }
63 }
64 }
65
View as plain text