1
16
17 package healthcheck
18
19 import (
20 "fmt"
21 "net/http"
22 "sync"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/klog/v2"
27 "k8s.io/kubernetes/pkg/proxy/metrics"
28 "k8s.io/utils/clock"
29 )
30
31 const (
32
33
34 ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
35 )
36
37
38
39
40
41
42
43
44 type ProxierHealthServer struct {
45 listener listener
46 httpFactory httpServerFactory
47 clock clock.Clock
48
49 addr string
50 healthTimeout time.Duration
51
52 lock sync.RWMutex
53 lastUpdatedMap map[v1.IPFamily]time.Time
54 oldestPendingQueuedMap map[v1.IPFamily]time.Time
55 nodeEligible bool
56 }
57
58
59 func NewProxierHealthServer(addr string, healthTimeout time.Duration) *ProxierHealthServer {
60 return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout)
61 }
62
63 func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxierHealthServer {
64 return &ProxierHealthServer{
65 listener: listener,
66 httpFactory: httpServerFactory,
67 clock: c,
68 addr: addr,
69 healthTimeout: healthTimeout,
70
71 lastUpdatedMap: make(map[v1.IPFamily]time.Time),
72 oldestPendingQueuedMap: make(map[v1.IPFamily]time.Time),
73
74
75
76 nodeEligible: true,
77 }
78 }
79
80
81
82 func (hs *ProxierHealthServer) Updated(ipFamily v1.IPFamily) {
83 hs.lock.Lock()
84 defer hs.lock.Unlock()
85 delete(hs.oldestPendingQueuedMap, ipFamily)
86 hs.lastUpdatedMap[ipFamily] = hs.clock.Now()
87 }
88
89
90
91
92
93
94 func (hs *ProxierHealthServer) QueuedUpdate(ipFamily v1.IPFamily) {
95 hs.lock.Lock()
96 defer hs.lock.Unlock()
97
98 if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set {
99 hs.oldestPendingQueuedMap[ipFamily] = hs.clock.Now()
100 }
101 }
102
103
104
105 func (hs *ProxierHealthServer) IsHealthy() bool {
106 isHealthy, _ := hs.isHealthy()
107 return isHealthy
108 }
109
110 func (hs *ProxierHealthServer) isHealthy() (bool, time.Time) {
111 hs.lock.RLock()
112 defer hs.lock.RUnlock()
113
114 var lastUpdated time.Time
115 currentTime := hs.clock.Now()
116
117 for ipFamily, proxierLastUpdated := range hs.lastUpdatedMap {
118
119 if proxierLastUpdated.After(lastUpdated) {
120 lastUpdated = proxierLastUpdated
121 }
122
123 if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set {
124
125
126 continue
127 }
128
129 if currentTime.Sub(hs.oldestPendingQueuedMap[ipFamily]) < hs.healthTimeout {
130
131 continue
132 }
133 return false, proxierLastUpdated
134 }
135 return true, lastUpdated
136 }
137
138
139
140 func (hs *ProxierHealthServer) SyncNode(node *v1.Node) {
141 hs.lock.Lock()
142 defer hs.lock.Unlock()
143
144 if !node.DeletionTimestamp.IsZero() {
145 hs.nodeEligible = false
146 return
147 }
148 for _, taint := range node.Spec.Taints {
149 if taint.Key == ToBeDeletedTaint {
150 hs.nodeEligible = false
151 return
152 }
153 }
154 hs.nodeEligible = true
155 }
156
157
158 func (hs *ProxierHealthServer) NodeEligible() bool {
159 hs.lock.RLock()
160 defer hs.lock.RUnlock()
161 return hs.nodeEligible
162 }
163
164
165 func (hs *ProxierHealthServer) Run() error {
166 serveMux := http.NewServeMux()
167 serveMux.Handle("/healthz", healthzHandler{hs: hs})
168 serveMux.Handle("/livez", livezHandler{hs: hs})
169 server := hs.httpFactory.New(hs.addr, serveMux)
170
171 listener, err := hs.listener.Listen(hs.addr)
172 if err != nil {
173 return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err)
174 }
175
176 klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr)
177
178 if err := server.Serve(listener); err != nil {
179 return fmt.Errorf("proxier healthz closed with error: %v", err)
180 }
181 return nil
182 }
183
184 type healthzHandler struct {
185 hs *ProxierHealthServer
186 }
187
188 func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
189 nodeEligible := h.hs.NodeEligible()
190 healthy, lastUpdated := h.hs.isHealthy()
191 currentTime := h.hs.clock.Now()
192
193 healthy = healthy && nodeEligible
194 resp.Header().Set("Content-Type", "application/json")
195 resp.Header().Set("X-Content-Type-Options", "nosniff")
196 if !healthy {
197 metrics.ProxyHealthzTotal.WithLabelValues("503").Inc()
198 resp.WriteHeader(http.StatusServiceUnavailable)
199 } else {
200 metrics.ProxyHealthzTotal.WithLabelValues("200").Inc()
201 resp.WriteHeader(http.StatusOK)
202
203
204
205
206
207 lastUpdated = currentTime
208 }
209 fmt.Fprintf(resp, `{"lastUpdated": %q,"currentTime": %q, "nodeEligible": %v}`, lastUpdated, currentTime, nodeEligible)
210 }
211
212 type livezHandler struct {
213 hs *ProxierHealthServer
214 }
215
216 func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
217 healthy, lastUpdated := h.hs.isHealthy()
218 currentTime := h.hs.clock.Now()
219 resp.Header().Set("Content-Type", "application/json")
220 resp.Header().Set("X-Content-Type-Options", "nosniff")
221 if !healthy {
222 metrics.ProxyLivezTotal.WithLabelValues("503").Inc()
223 resp.WriteHeader(http.StatusServiceUnavailable)
224 } else {
225 metrics.ProxyLivezTotal.WithLabelValues("200").Inc()
226 resp.WriteHeader(http.StatusOK)
227
228
229
230
231
232 lastUpdated = currentTime
233 }
234 fmt.Fprintf(resp, `{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)
235 }
236
View as plain text