1
16
17 package healthcheck
18
19 import (
20 "fmt"
21 "net"
22 "net/http"
23 "strconv"
24 "strings"
25 "sync"
26
27 "github.com/lithammer/dedent"
28
29 v1 "k8s.io/api/core/v1"
30 "k8s.io/apimachinery/pkg/types"
31 utilerrors "k8s.io/apimachinery/pkg/util/errors"
32 "k8s.io/client-go/tools/events"
33 "k8s.io/klog/v2"
34 api "k8s.io/kubernetes/pkg/apis/core"
35 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
36 )
37
38
39
40
41
42 type ServiceHealthServer interface {
43
44
45
46
47 SyncServices(newServices map[types.NamespacedName]uint16) error
48
49
50
51 SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
52 }
53
54 type proxierHealthChecker interface {
55
56
57 IsHealthy() bool
58 }
59
60 func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
61
62
63 nodeIPs := []net.IP{net.IPv4zero}
64
65 if !nodePortAddresses.MatchAll() {
66 ips, err := nodePortAddresses.GetNodeIPs(proxyutil.RealNetwork{})
67 if err == nil {
68 nodeIPs = ips
69 } else {
70 klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodePortAddresses", nodePortAddresses)
71 }
72 }
73
74 return &server{
75 hostname: hostname,
76 recorder: recorder,
77 listener: listener,
78 httpFactory: factory,
79 healthzServer: healthzServer,
80 services: map[types.NamespacedName]*hcInstance{},
81 nodeIPs: nodeIPs,
82 }
83 }
84
85
86 func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
87 return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer)
88 }
89
90 type server struct {
91 hostname string
92
93 nodeIPs []net.IP
94 recorder events.EventRecorder
95 listener listener
96 httpFactory httpServerFactory
97
98 healthzServer proxierHealthChecker
99
100 lock sync.RWMutex
101 services map[types.NamespacedName]*hcInstance
102 }
103
104 func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
105 hcs.lock.Lock()
106 defer hcs.lock.Unlock()
107
108
109 for nsn, svc := range hcs.services {
110 if port, found := newServices[nsn]; !found || port != svc.port {
111 klog.V(2).InfoS("Closing healthcheck", "service", nsn, "port", svc.port)
112
113
114 _ = svc.closeAll()
115
116 delete(hcs.services, nsn)
117
118 }
119 }
120
121
122 for nsn, port := range newServices {
123 if hcs.services[nsn] != nil {
124 klog.V(3).InfoS("Existing healthcheck", "service", nsn, "port", port)
125 continue
126 }
127
128 klog.V(2).InfoS("Opening healthcheck", "service", nsn, "port", port)
129
130 svc := &hcInstance{nsn: nsn, port: port}
131 err := svc.listenAndServeAll(hcs)
132
133 if err != nil {
134 msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
135
136 if hcs.recorder != nil {
137 hcs.recorder.Eventf(
138 &v1.ObjectReference{
139 Kind: "Service",
140 Namespace: nsn.Namespace,
141 Name: nsn.Name,
142 UID: types.UID(nsn.String()),
143 }, nil, api.EventTypeWarning, "FailedToStartServiceHealthcheck", "Listen", msg)
144 }
145 klog.ErrorS(err, "Failed to start healthcheck", "node", hcs.hostname, "service", nsn, "port", port)
146 continue
147 }
148 hcs.services[nsn] = svc
149 }
150 return nil
151 }
152
153 type hcInstance struct {
154 nsn types.NamespacedName
155 port uint16
156
157 httpServers []httpServer
158
159 endpoints int
160 }
161
162
163 func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
164 var err error
165 var listener net.Listener
166
167 hcI.httpServers = make([]httpServer, 0, len(hcs.nodeIPs))
168
169
170 for _, ip := range hcs.nodeIPs {
171 addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port))
172
173 httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
174
175 listener, err = hcs.listener.Listen(addr)
176 if err != nil {
177
178
179 _ = hcI.closeAll()
180 return err
181 }
182
183
184 go func(hcI *hcInstance, listener net.Listener, httpSrv httpServer) {
185
186 klog.V(3).InfoS("Starting goroutine for healthcheck", "service", hcI.nsn, "address", listener.Addr())
187 if err := httpSrv.Serve(listener); err != nil && err != http.ErrServerClosed {
188 klog.ErrorS(err, "Healthcheck closed", "service", hcI.nsn)
189 return
190 }
191 klog.V(3).InfoS("Healthcheck closed", "service", hcI.nsn, "address", listener.Addr())
192 }(hcI, listener, httpSrv)
193
194 hcI.httpServers = append(hcI.httpServers, httpSrv)
195 }
196
197 return nil
198 }
199
200 func (hcI *hcInstance) closeAll() error {
201 errors := []error{}
202 for _, server := range hcI.httpServers {
203 if err := server.Close(); err != nil {
204 klog.ErrorS(err, "Error closing server for health check service", "service", hcI.nsn)
205 errors = append(errors, err)
206 }
207 }
208
209 if len(errors) > 0 {
210 return utilerrors.NewAggregate(errors)
211 }
212
213 return nil
214 }
215
216 type hcHandler struct {
217 name types.NamespacedName
218 hcs *server
219 }
220
221 var _ http.Handler = hcHandler{}
222
223 func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
224 h.hcs.lock.RLock()
225 svc, ok := h.hcs.services[h.name]
226 if !ok || svc == nil {
227 h.hcs.lock.RUnlock()
228 klog.ErrorS(nil, "Received request for closed healthcheck", "service", h.name)
229 return
230 }
231 count := svc.endpoints
232 h.hcs.lock.RUnlock()
233 kubeProxyHealthy := h.hcs.healthzServer.IsHealthy()
234
235 resp.Header().Set("Content-Type", "application/json")
236 resp.Header().Set("X-Content-Type-Options", "nosniff")
237 resp.Header().Set("X-Load-Balancing-Endpoint-Weight", strconv.Itoa(count))
238
239 if count != 0 && kubeProxyHealthy {
240 resp.WriteHeader(http.StatusOK)
241 } else {
242 resp.WriteHeader(http.StatusServiceUnavailable)
243 }
244 fmt.Fprint(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
245 {
246 "service": {
247 "namespace": %q,
248 "name": %q
249 },
250 "localEndpoints": %d,
251 "serviceProxyHealthy": %v
252 }
253 `, h.name.Namespace, h.name.Name, count, kubeProxyHealthy)), "\n"))
254 }
255
256 func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
257 hcs.lock.Lock()
258 defer hcs.lock.Unlock()
259
260 for nsn, count := range newEndpoints {
261 if hcs.services[nsn] == nil {
262 continue
263 }
264 klog.V(3).InfoS("Reporting endpoints for healthcheck", "endpointCount", count, "service", nsn)
265 hcs.services[nsn].endpoints = count
266 }
267 for nsn, hci := range hcs.services {
268 if _, found := newEndpoints[nsn]; !found {
269 hci.endpoints = 0
270 }
271 }
272 return nil
273 }
274
275
276 type FakeServiceHealthServer struct{}
277
278
279 func NewFakeServiceHealthServer() ServiceHealthServer {
280 return FakeServiceHealthServer{}
281 }
282
283
284 func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error {
285 return nil
286 }
287
288
289 func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error {
290 return nil
291 }
292
View as plain text