1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package etcdhttp
19
20 import (
21 "bytes"
22 "context"
23 "encoding/json"
24 "fmt"
25 "net/http"
26 "path"
27 "strings"
28 "time"
29
30 "go.uber.org/zap"
31
32 "github.com/prometheus/client_golang/prometheus"
33 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
34 "go.etcd.io/etcd/client/pkg/v3/types"
35 "go.etcd.io/etcd/raft/v3"
36 "go.etcd.io/etcd/server/v3/auth"
37 "go.etcd.io/etcd/server/v3/config"
38 "go.etcd.io/etcd/server/v3/etcdserver"
39 )
40
41 const (
42 PathHealth = "/health"
43 PathProxyHealth = "/proxy/health"
44 HealthStatusSuccess string = "success"
45 HealthStatusError string = "error"
46 checkTypeLivez = "livez"
47 checkTypeReadyz = "readyz"
48 checkTypeHealth = "health"
49 )
50
51 type ServerHealth interface {
52 serverHealthV2V3
53 Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
54 Config() config.ServerConfig
55 AuthStore() auth.AuthStore
56 }
57
58 type serverHealthV2V3 interface {
59 Alarms() []*pb.AlarmMember
60 Leader() types.ID
61 }
62
63
64 func HandleHealthForV2(lg *zap.Logger, mux *http.ServeMux, srv etcdserver.ServerV2) {
65 mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
66 if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
67 return h
68 }
69 if h := checkLeader(lg, srv, serializable); h.Health != "true" {
70 return h
71 }
72 return checkV2API(ctx, lg, srv)
73 }))
74 }
75
76
77
78 func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
79 mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
80 if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
81 return h
82 }
83 if h := checkLeader(lg, srv, serializable); h.Health != "true" {
84 return h
85 }
86 return checkAPI(ctx, lg, srv, serializable)
87 }))
88
89 installLivezEndpoints(lg, mux, srv)
90 installReadyzEndpoints(lg, mux, srv)
91 }
92
93
94 func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms StringSet, Serializable bool) Health) http.HandlerFunc {
95 return func(w http.ResponseWriter, r *http.Request) {
96 if r.Method != http.MethodGet {
97 w.Header().Set("Allow", http.MethodGet)
98 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
99 lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed))
100 return
101 }
102 excludedAlarms := getQuerySet(r, "exclude")
103
104
105
106
107 serializableFlag := getSerializableFlag(r)
108 h := hfunc(r.Context(), excludedAlarms, serializableFlag)
109 defer func() {
110 if h.Health == "true" {
111 healthSuccess.Inc()
112 } else {
113 healthFailed.Inc()
114 }
115 }()
116 d, _ := json.Marshal(h)
117 if h.Health != "true" {
118 http.Error(w, string(d), http.StatusServiceUnavailable)
119 lg.Warn("/health error", zap.String("output", string(d)), zap.Int("status-code", http.StatusServiceUnavailable))
120 return
121 }
122 w.WriteHeader(http.StatusOK)
123 w.Write(d)
124 lg.Debug("/health OK", zap.Int("status-code", http.StatusOK))
125 }
126 }
127
128 var (
129 healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
130 Namespace: "etcd",
131 Subsystem: "server",
132 Name: "health_success",
133 Help: "The total number of successful health checks",
134 })
135 healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
136 Namespace: "etcd",
137 Subsystem: "server",
138 Name: "health_failures",
139 Help: "The total number of failed health checks",
140 })
141 healthCheckGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
142 Namespace: "etcd",
143 Subsystem: "server",
144 Name: "healthcheck",
145 Help: "The result of each kind of healthcheck.",
146 },
147 []string{"type", "name"},
148 )
149 healthCheckCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
150 Namespace: "etcd",
151 Subsystem: "server",
152 Name: "healthchecks_total",
153 Help: "The total number of each kind of healthcheck.",
154 },
155 []string{"type", "name", "status"},
156 )
157 )
158
159 func init() {
160 prometheus.MustRegister(healthSuccess)
161 prometheus.MustRegister(healthFailed)
162 prometheus.MustRegister(healthCheckGauge)
163 prometheus.MustRegister(healthCheckCounter)
164 }
165
166
167
168 type Health struct {
169 Health string `json:"health"`
170 Reason string `json:"reason"`
171 }
172
173
174 type HealthStatus struct {
175 Reason string `json:"reason"`
176 Status string `json:"status"`
177 }
178
179 func getQuerySet(r *http.Request, query string) StringSet {
180 querySet := make(map[string]struct{})
181 qs, found := r.URL.Query()[query]
182 if found {
183 for _, q := range qs {
184 if len(q) == 0 {
185 continue
186 }
187 querySet[q] = struct{}{}
188 }
189 }
190 return querySet
191 }
192
193 func getSerializableFlag(r *http.Request) bool {
194 return r.URL.Query().Get("serializable") == "true"
195 }
196
197
198
199 func checkAlarms(lg *zap.Logger, srv serverHealthV2V3, excludedAlarms StringSet) Health {
200 h := Health{Health: "true"}
201 as := srv.Alarms()
202 if len(as) > 0 {
203 for _, v := range as {
204 alarmName := v.Alarm.String()
205 if _, found := excludedAlarms[alarmName]; found {
206 lg.Debug("/health excluded alarm", zap.String("alarm", v.String()))
207 continue
208 }
209
210 h.Health = "false"
211 switch v.Alarm {
212 case pb.AlarmType_NOSPACE:
213 h.Reason = "ALARM NOSPACE"
214 case pb.AlarmType_CORRUPT:
215 h.Reason = "ALARM CORRUPT"
216 default:
217 h.Reason = "ALARM UNKNOWN"
218 }
219 lg.Warn("serving /health false due to an alarm", zap.String("alarm", v.String()))
220 return h
221 }
222 }
223
224 return h
225 }
226
227 func checkLeader(lg *zap.Logger, srv serverHealthV2V3, serializable bool) Health {
228 h := Health{Health: "true"}
229 if !serializable && (uint64(srv.Leader()) == raft.None) {
230 h.Health = "false"
231 h.Reason = "RAFT NO LEADER"
232 lg.Warn("serving /health false; no leader")
233 }
234 return h
235 }
236
237 func checkV2API(ctx context.Context, lg *zap.Logger, srv etcdserver.ServerV2) Health {
238 h := Health{Health: "true"}
239 ctx, cancel := context.WithTimeout(ctx, time.Second)
240 _, err := srv.Do(ctx, pb.Request{Method: "QGET"})
241 cancel()
242 if err != nil {
243 h.Health = "false"
244 h.Reason = fmt.Sprintf("QGET ERROR:%s", err)
245 lg.Warn("serving /health false; QGET fails", zap.Error(err))
246 return h
247 }
248 lg.Debug("serving /health true")
249 return h
250 }
251
252 func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializable bool) Health {
253 h := Health{Health: "true"}
254 cfg := srv.Config()
255 ctx = srv.AuthStore().WithRoot(ctx)
256 cctx, cancel := context.WithTimeout(ctx, cfg.ReqTimeout())
257 _, err := srv.Range(cctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
258 cancel()
259 if err != nil {
260 h.Health = "false"
261 h.Reason = fmt.Sprintf("RANGE ERROR:%s", err)
262 lg.Warn("serving /health false; Range fails", zap.Error(err))
263 return h
264 }
265 lg.Debug("serving /health true")
266 return h
267 }
268
269 type HealthCheck func(ctx context.Context) error
270
271 type CheckRegistry struct {
272 checkType string
273 checks map[string]HealthCheck
274 }
275
276 func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
277 reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
278 reg.Register("serializable_read", readCheck(server, true ))
279 reg.InstallHttpEndpoints(lg, mux)
280 }
281
282 func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
283 reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
284 reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
285
286
287
288 reg.Register("serializable_read", readCheck(server, true))
289
290 reg.Register("linearizable_read", readCheck(server, false))
291 reg.InstallHttpEndpoints(lg, mux)
292 }
293
294 func (reg *CheckRegistry) Register(name string, check HealthCheck) {
295 reg.checks[name] = check
296 }
297
298 func (reg *CheckRegistry) RootPath() string {
299 return "/" + reg.checkType
300 }
301
302 func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) {
303 checkNames := make([]string, 0, len(reg.checks))
304 for k := range reg.checks {
305 checkNames = append(checkNames, k)
306 }
307
308
309 reg.installRootHttpEndpoint(lg, mux, checkNames...)
310 for _, checkName := range checkNames {
311
312 subpath := path.Join(reg.RootPath(), checkName)
313 check := checkName
314 mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) HealthStatus {
315 return reg.runHealthChecks(r.Context(), check)
316 }))
317 }
318 }
319
320 func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) HealthStatus {
321 h := HealthStatus{Status: HealthStatusSuccess}
322 var individualCheckOutput bytes.Buffer
323 for _, checkName := range checkNames {
324 check, found := reg.checks[checkName]
325 if !found {
326 panic(fmt.Errorf("Health check: %s not registered", checkName))
327 }
328 if err := check(ctx); err != nil {
329 fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err)
330 h.Status = HealthStatusError
331 recordMetrics(reg.checkType, checkName, HealthStatusError)
332 } else {
333 fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName)
334 recordMetrics(reg.checkType, checkName, HealthStatusSuccess)
335 }
336 }
337 h.Reason = individualCheckOutput.String()
338 return h
339 }
340
341
342 func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, checks ...string) {
343 hfunc := func(r *http.Request) HealthStatus {
344
345 excluded := getQuerySet(r, "exclude")
346
347 filteredCheckNames := filterCheckList(lg, listToStringSet(checks), excluded)
348 h := reg.runHealthChecks(r.Context(), filteredCheckNames...)
349 return h
350 }
351 mux.Handle(reg.RootPath(), newHealthHandler(reg.RootPath(), lg, hfunc))
352 }
353
354
355 func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) HealthStatus) http.HandlerFunc {
356 return func(w http.ResponseWriter, r *http.Request) {
357 if r.Method != http.MethodGet {
358 w.Header().Set("Allow", http.MethodGet)
359 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
360 lg.Warn("Health request error", zap.String("path", path), zap.Int("status-code", http.StatusMethodNotAllowed))
361 return
362 }
363 h := hfunc(r)
364
365 if h.Status == HealthStatusError {
366 http.Error(w, h.Reason, http.StatusServiceUnavailable)
367 lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable))
368 return
369 }
370 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
371 w.Header().Set("X-Content-Type-Options", "nosniff")
372
373 if _, found := r.URL.Query()["verbose"]; found {
374 fmt.Fprint(w, h.Reason)
375 }
376 fmt.Fprint(w, "ok\n")
377 lg.Debug("Health check OK", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusOK))
378 }
379 }
380
381 func filterCheckList(lg *zap.Logger, checks StringSet, excluded StringSet) []string {
382 filteredList := []string{}
383 for chk := range checks {
384 if _, found := excluded[chk]; found {
385 delete(excluded, chk)
386 continue
387 }
388 filteredList = append(filteredList, chk)
389 }
390 if len(excluded) > 0 {
391
392 lg.Warn("some health checks cannot be excluded", zap.String("missing-health-checks", formatQuoted(excluded.List()...)))
393 }
394 return filteredList
395 }
396
397
398
399 func formatQuoted(names ...string) string {
400 quoted := make([]string, 0, len(names))
401 for _, name := range names {
402 quoted = append(quoted, fmt.Sprintf("%q", name))
403 }
404 return strings.Join(quoted, ",")
405 }
406
407 type StringSet map[string]struct{}
408
409 func (s StringSet) List() []string {
410 keys := make([]string, 0, len(s))
411 for k := range s {
412 keys = append(keys, k)
413 }
414 return keys
415 }
416
417 func listToStringSet(list []string) StringSet {
418 set := make(map[string]struct{})
419 for _, s := range list {
420 set[s] = struct{}{}
421 }
422 return set
423 }
424
425 func recordMetrics(checkType, name string, status string) {
426 val := 0.0
427 if status == HealthStatusSuccess {
428 val = 1.0
429 }
430 healthCheckGauge.With(prometheus.Labels{
431 "type": checkType,
432 "name": name,
433 }).Set(val)
434 healthCheckCounter.With(prometheus.Labels{
435 "type": checkType,
436 "name": name,
437 "status": status,
438 }).Inc()
439 }
440
441
442 func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error {
443 return func(ctx context.Context) error {
444 as := srv.Alarms()
445 for _, v := range as {
446 if v.Alarm == at {
447 return fmt.Errorf("alarm activated: %s", at.String())
448 }
449 }
450 return nil
451 }
452 }
453
454 func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
455 return func(ctx context.Context) error {
456 ctx = srv.AuthStore().WithRoot(ctx)
457 _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
458 return err
459 }
460 }
461
View as plain text