...
1 package metric
2
3 import (
4 "fmt"
5 "os"
6 "path/filepath"
7 "regexp"
8 "time"
9
10 "github.com/prometheus/client_golang/prometheus"
11
12 "edge-infra.dev/pkg/edge/datasync/internal/config"
13 "edge-infra.dev/pkg/edge/datasync/internal/outbox"
14 "edge-infra.dev/pkg/lib/fog"
15 )
16
17 var outboxMessagesGauge = prometheus.NewGaugeVec(
18 prometheus.GaugeOpts{
19 Name: "data_sync_messaging_num_of_messages_in_outbox_gauge",
20 Help: "number of messages in data-sync-messaging outboxes",
21 },
22 []string{"outbox"})
23
24 var outboxMessagesSizeHistogram = prometheus.NewHistogramVec(
25 prometheus.HistogramOpts{
26 Name: "data_sync_messaging_size_of_messages_in_outbox",
27 Help: "size of messages in data-sync-messaging outboxes by message type",
28
29 Buckets: prometheus.ExponentialBuckets(1024, 10, 5),
30 },
31 []string{"outbox", "type", "partition"})
32
33 type outboxMessagesMetric struct {
34 chirpConfig *config.Config
35 }
36
37 func NewOutboxMessagesMetric(chirpConfig *config.Config) Metric {
38 prometheus.MustRegister(outboxMessagesGauge)
39 prometheus.MustRegister(outboxMessagesSizeHistogram)
40
41 return &outboxMessagesMetric{chirpConfig: chirpConfig}
42 }
43
44 func (s *outboxMessagesMetric) StartRecording() {
45 logger := fog.New()
46
47 logger.Info("starting to record metrics...")
48 outboxPath := config.GetOutboxPath(s.chirpConfig)
49 messageTypes := config.GetRoutes(s.chirpConfig)
50
51 for {
52
53 outboxMessagesSizeHistogram.Reset()
54 for _, messageType := range messageTypes {
55 count := countMessages(outboxPath, messageType)
56 outboxMessagesGauge.WithLabelValues(messageType).Set(float64(count))
57 }
58
59 time.Sleep(15 * time.Second)
60 }
61 }
62
63 func countMessages(outboxPath, messageType string) int {
64 rootPath := fmt.Sprintf("%s/%s", outboxPath, messageType)
65 logger := fog.New()
66
67 count := 0
68
69 err := filepath.Walk(rootPath, func(path string, f os.FileInfo, _ error) error {
70 if f != nil && !f.IsDir() {
71 r, err := regexp.MatchString(".msg", f.Name())
72 if err == nil && r {
73 count++
74 partition := filepath.Base(filepath.Dir(path))
75 outboxMessagesSizeHistogram.
76 WithLabelValues(messageType, outbox.GetFileMessageType(f), partition).
77 Observe(float64(f.Size()))
78 }
79 }
80 return nil
81 })
82
83 if err != nil {
84 logger.Error(err, "failed to count .dsm messages")
85 }
86
87 return count
88 }
89
View as plain text