...
1 package worker
2
3 import "github.com/prometheus/client_golang/prometheus"
4
5 type Metrics struct {
6 MessagesReadTotal *prometheus.CounterVec
7 MessagesSentTotal *prometheus.CounterVec
8 MessagesFailedToBePublishedTotal *prometheus.CounterVec
9 KafkaSuccessfulAcksTotal *prometheus.CounterVec
10 KafkaFailedAcksTotal *prometheus.CounterVec
11 KafkaLastAckedMessageGauge *prometheus.GaugeVec
12 }
13
14 func NewMetrics() *Metrics {
15 m := &Metrics{
16 MessagesReadTotal: prometheus.NewCounterVec(
17 prometheus.CounterOpts{
18 Name: "kafka_connect_pubsub__messages_read_total",
19 Help: "total number of messages the connector has read from kafka topic",
20 }, []string{"topic"}),
21 MessagesSentTotal: prometheus.NewCounterVec(
22 prometheus.CounterOpts{
23 Name: "kafka_connect_pubsub__messages_sent_total",
24 Help: "total number of messages that the connector sent to pub/sub",
25 }, []string{"topic"}),
26 MessagesFailedToBePublishedTotal: prometheus.NewCounterVec(
27 prometheus.CounterOpts{
28 Name: "kafka_connect_pubsub__messages_failed_to_be_published_total",
29 Help: "total number of messages that failed to be published to pub/sub",
30 }, []string{"topic"}),
31 KafkaSuccessfulAcksTotal: prometheus.NewCounterVec(
32 prometheus.CounterOpts{
33 Name: "kafka_connect_pubsub__successful_kafka_acks_total",
34 Help: "total number of successful kafka acks",
35 }, []string{"topic"}),
36 KafkaFailedAcksTotal: prometheus.NewCounterVec(
37 prometheus.CounterOpts{
38 Name: "kafka_connect_pubsub__failed_kafka_acks_total",
39 Help: "total number of failed kafka acks",
40 }, []string{"topic"}),
41 KafkaLastAckedMessageGauge: prometheus.NewGaugeVec(
42 prometheus.GaugeOpts{
43 Name: "kafka_connect_pubsub__last_acked_kafka_message",
44 Help: "the last kafka acked message",
45 }, []string{"topic", "partition"}),
46 }
47
48 prometheus.MustRegister(
49 m.MessagesReadTotal,
50 m.MessagesSentTotal,
51 m.MessagesFailedToBePublishedTotal,
52 m.KafkaSuccessfulAcksTotal,
53 m.KafkaFailedAcksTotal,
54 m.KafkaLastAckedMessageGauge,
55 )
56
57 return m
58 }
59
60 func (m *Metrics) Reset() {
61 m.MessagesReadTotal.Reset()
62 m.MessagesSentTotal.Reset()
63 m.KafkaSuccessfulAcksTotal.Reset()
64 m.KafkaFailedAcksTotal.Reset()
65 m.MessagesFailedToBePublishedTotal.Reset()
66 }
67
View as plain text