package metric import ( "fmt" "os" "path/filepath" "regexp" "time" "github.com/prometheus/client_golang/prometheus" "edge-infra.dev/pkg/edge/datasync/internal/config" "edge-infra.dev/pkg/edge/datasync/internal/outbox" "edge-infra.dev/pkg/lib/fog" ) var outboxMessagesGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "data_sync_messaging_num_of_messages_in_outbox_gauge", Help: "number of messages in data-sync-messaging outboxes", }, []string{"outbox"}) var outboxMessagesSizeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "data_sync_messaging_size_of_messages_in_outbox", Help: "size of messages in data-sync-messaging outboxes by message type", // Buckets of 1KB 10KB 100KB 1MB 10MB Buckets: prometheus.ExponentialBuckets(1024, 10, 5), }, []string{"outbox", "type", "partition"}) type outboxMessagesMetric struct { chirpConfig *config.Config } func NewOutboxMessagesMetric(chirpConfig *config.Config) Metric { prometheus.MustRegister(outboxMessagesGauge) prometheus.MustRegister(outboxMessagesSizeHistogram) return &outboxMessagesMetric{chirpConfig: chirpConfig} } func (s *outboxMessagesMetric) StartRecording() { logger := fog.New() logger.Info("starting to record metrics...") outboxPath := config.GetOutboxPath(s.chirpConfig) messageTypes := config.GetRoutes(s.chirpConfig) for { // TODO: we might need to rework this logic down the road outboxMessagesSizeHistogram.Reset() for _, messageType := range messageTypes { count := countMessages(outboxPath, messageType) outboxMessagesGauge.WithLabelValues(messageType).Set(float64(count)) } time.Sleep(15 * time.Second) } } func countMessages(outboxPath, messageType string) int { rootPath := fmt.Sprintf("%s/%s", outboxPath, messageType) logger := fog.New() count := 0 err := filepath.Walk(rootPath, func(path string, f os.FileInfo, _ error) error { if f != nil && !f.IsDir() { r, err := regexp.MatchString(".msg", f.Name()) if err == nil && r { count++ partition := filepath.Base(filepath.Dir(path)) outboxMessagesSizeHistogram. WithLabelValues(messageType, outbox.GetFileMessageType(f), partition). Observe(float64(f.Size())) } } return nil }) if err != nil { logger.Error(err, "failed to count .dsm messages") } return count }