...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "fmt"
19 "io/ioutil"
20 "math/rand"
21 "net/http"
22 "strings"
23 "time"
24
25 "github.com/prometheus/client_golang/prometheus"
26 "github.com/prometheus/client_golang/prometheus/promhttp"
27 "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
28 )
29
30 var (
31 watchersCoalescing = prometheus.NewGauge(prometheus.GaugeOpts{
32 Namespace: "etcd",
33 Subsystem: "grpc_proxy",
34 Name: "watchers_coalescing_total",
35 Help: "Total number of current watchers coalescing",
36 })
37 eventsCoalescing = prometheus.NewCounter(prometheus.CounterOpts{
38 Namespace: "etcd",
39 Subsystem: "grpc_proxy",
40 Name: "events_coalescing_total",
41 Help: "Total number of events coalescing",
42 })
43 cacheKeys = prometheus.NewGauge(prometheus.GaugeOpts{
44 Namespace: "etcd",
45 Subsystem: "grpc_proxy",
46 Name: "cache_keys_total",
47 Help: "Total number of keys/ranges cached",
48 })
49 cacheHits = prometheus.NewGauge(prometheus.GaugeOpts{
50 Namespace: "etcd",
51 Subsystem: "grpc_proxy",
52 Name: "cache_hits_total",
53 Help: "Total number of cache hits",
54 })
55 cachedMisses = prometheus.NewGauge(prometheus.GaugeOpts{
56 Namespace: "etcd",
57 Subsystem: "grpc_proxy",
58 Name: "cache_misses_total",
59 Help: "Total number of cache misses",
60 })
61 )
62
63 func init() {
64 prometheus.MustRegister(watchersCoalescing)
65 prometheus.MustRegister(eventsCoalescing)
66 prometheus.MustRegister(cacheKeys)
67 prometheus.MustRegister(cacheHits)
68 prometheus.MustRegister(cachedMisses)
69 }
70
71
72 func HandleMetrics(mux *http.ServeMux, c *http.Client, eps []string) {
73
74 r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
75 if len(eps) > 1 {
76 eps = shuffleEndpoints(r, eps)
77 }
78
79 pathMetrics := etcdhttp.PathMetrics
80 mux.HandleFunc(pathMetrics, func(w http.ResponseWriter, r *http.Request) {
81 target := fmt.Sprintf("%s%s", eps[0], pathMetrics)
82 if !strings.HasPrefix(target, "http") {
83 scheme := "http"
84 if r.TLS != nil {
85 scheme = "https"
86 }
87 target = fmt.Sprintf("%s://%s", scheme, target)
88 }
89
90 resp, err := c.Get(target)
91 if err != nil {
92 http.Error(w, "Internal server error", http.StatusInternalServerError)
93 return
94 }
95 defer resp.Body.Close()
96 w.Header().Set("Content-Type", "text/plain; version=0.0.4")
97 body, _ := ioutil.ReadAll(resp.Body)
98 fmt.Fprintf(w, "%s", body)
99 })
100 }
101
102
103 func HandleProxyMetrics(mux *http.ServeMux) {
104 mux.Handle(etcdhttp.PathProxyMetrics, promhttp.Handler())
105 }
106
107 func shuffleEndpoints(r *rand.Rand, eps []string) []string {
108
109 n := len(eps)
110 p := make([]int, n)
111 for i := 0; i < n; i++ {
112 j := r.Intn(i + 1)
113 p[i] = p[j]
114 p[j] = i
115 }
116 neps := make([]string, n)
117 for i, k := range p {
118 neps[i] = eps[k]
119 }
120 return neps
121 }
122
View as plain text