...
1 package servicemirror
2
3 import (
4 "fmt"
5 "net"
6 "net/http"
7 "strconv"
8 "sync"
9 "time"
10
11 "github.com/linkerd/linkerd2/pkg/multicluster"
12 "github.com/prometheus/client_golang/prometheus"
13 logging "github.com/sirupsen/logrus"
14 )
15
16 const httpGatewayTimeoutMillis = 50000
17
18
19 type ProbeWorker struct {
20 localGatewayName string
21 alive bool
22 Liveness chan bool
23 *sync.RWMutex
24 probeSpec *multicluster.ProbeSpec
25 stopCh chan struct{}
26 metrics *ProbeMetrics
27 log *logging.Entry
28 }
29
30
31 func NewProbeWorker(localGatewayName string, spec *multicluster.ProbeSpec, metrics *ProbeMetrics, probekey string) *ProbeWorker {
32 metrics.gatewayEnabled.Set(1)
33 return &ProbeWorker{
34 localGatewayName: localGatewayName,
35 Liveness: make(chan bool, 10),
36 RWMutex: &sync.RWMutex{},
37 probeSpec: spec,
38 stopCh: make(chan struct{}),
39 metrics: metrics,
40 log: logging.WithFields(logging.Fields{
41 "probe-key": probekey,
42 }),
43 }
44 }
45
46
47 func (pw *ProbeWorker) UpdateProbeSpec(spec *multicluster.ProbeSpec) {
48 pw.Lock()
49 pw.probeSpec = spec
50 pw.Unlock()
51 }
52
53
54 func (pw *ProbeWorker) Stop() {
55 pw.metrics.unregister()
56 pw.log.Infof("Stopping probe worker")
57 close(pw.stopCh)
58 }
59
60
61 func (pw *ProbeWorker) Start() {
62
63 pw.log.Infof("Starting probe worker")
64 go pw.run()
65 }
66
67 func (pw *ProbeWorker) run() {
68 probeTickerPeriod := pw.probeSpec.Period
69 maxJitter := pw.probeSpec.Period / 10
70 probeTicker := NewTicker(probeTickerPeriod, maxJitter)
71 defer probeTicker.Stop()
72
73 probeLoop:
74 for {
75 select {
76 case <-pw.stopCh:
77 break probeLoop
78 case <-probeTicker.C:
79 pw.doProbe()
80 }
81 }
82 }
83
84 func (pw *ProbeWorker) doProbe() {
85 pw.RLock()
86 defer pw.RUnlock()
87
88 successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
89 notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}
90
91 client := http.Client{
92 Timeout: httpGatewayTimeoutMillis * time.Millisecond,
93 }
94
95 strPort := strconv.Itoa(int(pw.probeSpec.Port))
96 urlAddress := net.JoinHostPort(pw.localGatewayName, strPort)
97 req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", urlAddress, pw.probeSpec.Path), nil)
98 if err != nil {
99 pw.log.Errorf("Could not create a GET request to gateway: %s", err)
100 return
101 }
102
103 start := time.Now()
104 resp, err := client.Do(req)
105 end := time.Since(start)
106 if err != nil {
107 pw.log.Warnf("Problem connecting with gateway. Marking as unhealthy %s", err)
108 pw.metrics.alive.Set(0)
109 pw.metrics.probes.With(notSuccessLabel).Inc()
110 if pw.alive {
111 pw.alive = false
112 pw.Liveness <- false
113 }
114 return
115 }
116 if resp.StatusCode != 200 {
117 pw.log.Warnf("Gateway returned unexpected status %d. Marking as unhealthy", resp.StatusCode)
118 pw.metrics.alive.Set(0)
119 pw.metrics.probes.With(notSuccessLabel).Inc()
120 if pw.alive {
121 pw.alive = false
122 pw.Liveness <- false
123 }
124 } else {
125 pw.log.Debug("Gateway is healthy")
126 pw.metrics.alive.Set(1)
127 pw.metrics.latency.Set(float64(end.Milliseconds()))
128 pw.metrics.latencies.Observe(float64(end.Milliseconds()))
129 pw.metrics.probes.With(successLabel).Inc()
130 if !pw.alive {
131 pw.alive = true
132 pw.Liveness <- true
133 }
134 }
135
136 if err := resp.Body.Close(); err != nil {
137 pw.log.Warnf("Failed to close response body %s", err)
138 }
139
140 }
141
View as plain text