...

Source file src/edge-infra.dev/pkg/edge/datasync/internal/metric/outboxMessages.go

Documentation: edge-infra.dev/pkg/edge/datasync/internal/metric

     1  package metric
     2  
     3  import (
     4  	"fmt"
     5  	"os"
     6  	"path/filepath"
     7  	"regexp"
     8  	"time"
     9  
    10  	"github.com/prometheus/client_golang/prometheus"
    11  
    12  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    13  	"edge-infra.dev/pkg/edge/datasync/internal/outbox"
    14  	"edge-infra.dev/pkg/lib/fog"
    15  )
    16  
    17  var outboxMessagesGauge = prometheus.NewGaugeVec(
    18  	prometheus.GaugeOpts{
    19  		Name: "data_sync_messaging_num_of_messages_in_outbox_gauge",
    20  		Help: "number of messages in data-sync-messaging outboxes",
    21  	},
    22  	[]string{"outbox"})
    23  
    24  var outboxMessagesSizeHistogram = prometheus.NewHistogramVec(
    25  	prometheus.HistogramOpts{
    26  		Name: "data_sync_messaging_size_of_messages_in_outbox",
    27  		Help: "size of messages in data-sync-messaging outboxes by message type",
    28  		// Buckets of 1KB 10KB 100KB 1MB 10MB
    29  		Buckets: prometheus.ExponentialBuckets(1024, 10, 5),
    30  	},
    31  	[]string{"outbox", "type", "partition"})
    32  
    33  type outboxMessagesMetric struct {
    34  	chirpConfig *config.Config
    35  }
    36  
    37  func NewOutboxMessagesMetric(chirpConfig *config.Config) Metric {
    38  	prometheus.MustRegister(outboxMessagesGauge)
    39  	prometheus.MustRegister(outboxMessagesSizeHistogram)
    40  
    41  	return &outboxMessagesMetric{chirpConfig: chirpConfig}
    42  }
    43  
    44  func (s *outboxMessagesMetric) StartRecording() {
    45  	logger := fog.New()
    46  
    47  	logger.Info("starting to record metrics...")
    48  	outboxPath := config.GetOutboxPath(s.chirpConfig)
    49  	messageTypes := config.GetRoutes(s.chirpConfig)
    50  
    51  	for {
    52  		// TODO: we might need to rework this logic down the road
    53  		outboxMessagesSizeHistogram.Reset()
    54  		for _, messageType := range messageTypes {
    55  			count := countMessages(outboxPath, messageType)
    56  			outboxMessagesGauge.WithLabelValues(messageType).Set(float64(count))
    57  		}
    58  
    59  		time.Sleep(15 * time.Second)
    60  	}
    61  }
    62  
    63  func countMessages(outboxPath, messageType string) int {
    64  	rootPath := fmt.Sprintf("%s/%s", outboxPath, messageType)
    65  	logger := fog.New()
    66  
    67  	count := 0
    68  
    69  	err := filepath.Walk(rootPath, func(path string, f os.FileInfo, _ error) error {
    70  		if f != nil && !f.IsDir() {
    71  			r, err := regexp.MatchString(".msg", f.Name())
    72  			if err == nil && r {
    73  				count++
    74  				partition := filepath.Base(filepath.Dir(path))
    75  				outboxMessagesSizeHistogram.
    76  					WithLabelValues(messageType, outbox.GetFileMessageType(f), partition).
    77  					Observe(float64(f.Size()))
    78  			}
    79  		}
    80  		return nil
    81  	})
    82  
    83  	if err != nil {
    84  		logger.Error(err, "failed to count .dsm messages")
    85  	}
    86  
    87  	return count
    88  }
    89  

View as plain text