
Source file src/k8s.io/kubernetes/pkg/proxy/healthcheck/service_health.go

Documentation: k8s.io/kubernetes/pkg/proxy/healthcheck

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package healthcheck
    19  import (
    20  	"fmt"
    21  	"net"
    22  	"net/http"
    23  	"strconv"
    24  	"strings"
    25  	"sync"
    27  	"github.com/lithammer/dedent"
    29  	v1 "k8s.io/api/core/v1"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    32  	"k8s.io/client-go/tools/events"
    33  	"k8s.io/klog/v2"
    34  	api "k8s.io/kubernetes/pkg/apis/core"
    35  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    36  )
    38  // ServiceHealthServer serves HTTP endpoints for each service name, with results
    39  // based on the endpoints.  If there are 0 endpoints for a service, it returns a
    40  // 503 "Service Unavailable" error (telling LBs not to use this node).  If there
    41  // are 1 or more endpoints, it returns a 200 "OK".
    42  type ServiceHealthServer interface {
    43  	// Make the new set of services be active.  Services that were open before
    44  	// will be closed.  Services that are new will be opened.  Service that
    45  	// existed and are in the new set will be left alone.  The value of the map
    46  	// is the healthcheck-port to listen on.
    47  	SyncServices(newServices map[types.NamespacedName]uint16) error
    48  	// Make the new set of endpoints be active.  Endpoints for services that do
    49  	// not exist will be dropped.  The value of the map is the number of
    50  	// endpoints the service has on this node.
    51  	SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
    52  }
    54  type proxierHealthChecker interface {
    55  	// IsHealthy returns the proxier's health state, following the same
    56  	// definition the HTTP server defines.
    57  	IsHealthy() bool
    58  }
    60  func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
    61  	// It doesn't matter whether we listen on "", "::", or ""; go
    62  	// treats them all the same.
    63  	nodeIPs := []net.IP{net.IPv4zero}
    65  	if !nodePortAddresses.MatchAll() {
    66  		ips, err := nodePortAddresses.GetNodeIPs(proxyutil.RealNetwork{})
    67  		if err == nil {
    68  			nodeIPs = ips
    69  		} else {
    70  			klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodePortAddresses", nodePortAddresses)
    71  		}
    72  	}
    74  	return &server{
    75  		hostname:      hostname,
    76  		recorder:      recorder,
    77  		listener:      listener,
    78  		httpFactory:   factory,
    79  		healthzServer: healthzServer,
    80  		services:      map[types.NamespacedName]*hcInstance{},
    81  		nodeIPs:       nodeIPs,
    82  	}
    83  }
    85  // NewServiceHealthServer allocates a new service healthcheck server manager
    86  func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
    87  	return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer)
    88  }
    90  type server struct {
    91  	hostname string
    92  	// node addresses where health check port will listen on
    93  	nodeIPs     []net.IP
    94  	recorder    events.EventRecorder // can be nil
    95  	listener    listener
    96  	httpFactory httpServerFactory
    98  	healthzServer proxierHealthChecker
   100  	lock     sync.RWMutex
   101  	services map[types.NamespacedName]*hcInstance
   102  }
   104  func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
   105  	hcs.lock.Lock()
   106  	defer hcs.lock.Unlock()
   108  	// Remove any that are not needed any more.
   109  	for nsn, svc := range hcs.services {
   110  		if port, found := newServices[nsn]; !found || port != svc.port {
   111  			klog.V(2).InfoS("Closing healthcheck", "service", nsn, "port", svc.port)
   113  			// errors are loged in closeAll()
   114  			_ = svc.closeAll()
   116  			delete(hcs.services, nsn)
   118  		}
   119  	}
   121  	// Add any that are needed.
   122  	for nsn, port := range newServices {
   123  		if hcs.services[nsn] != nil {
   124  			klog.V(3).InfoS("Existing healthcheck", "service", nsn, "port", port)
   125  			continue
   126  		}
   128  		klog.V(2).InfoS("Opening healthcheck", "service", nsn, "port", port)
   130  		svc := &hcInstance{nsn: nsn, port: port}
   131  		err := svc.listenAndServeAll(hcs)
   133  		if err != nil {
   134  			msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
   136  			if hcs.recorder != nil {
   137  				hcs.recorder.Eventf(
   138  					&v1.ObjectReference{
   139  						Kind:      "Service",
   140  						Namespace: nsn.Namespace,
   141  						Name:      nsn.Name,
   142  						UID:       types.UID(nsn.String()),
   143  					}, nil, api.EventTypeWarning, "FailedToStartServiceHealthcheck", "Listen", msg)
   144  			}
   145  			klog.ErrorS(err, "Failed to start healthcheck", "node", hcs.hostname, "service", nsn, "port", port)
   146  			continue
   147  		}
   148  		hcs.services[nsn] = svc
   149  	}
   150  	return nil
   151  }
   153  type hcInstance struct {
   154  	nsn  types.NamespacedName
   155  	port uint16
   157  	httpServers []httpServer
   159  	endpoints int // number of local endpoints for a service
   160  }
   162  // listenAll opens health check port on all the addresses provided
   163  func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
   164  	var err error
   165  	var listener net.Listener
   167  	hcI.httpServers = make([]httpServer, 0, len(hcs.nodeIPs))
   169  	// for each of the node addresses start listening and serving
   170  	for _, ip := range hcs.nodeIPs {
   171  		addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port))
   172  		// create http server
   173  		httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
   174  		// start listener
   175  		listener, err = hcs.listener.Listen(addr)
   176  		if err != nil {
   177  			// must close whatever have been previously opened
   178  			// to allow a retry/or port ownership change as needed
   179  			_ = hcI.closeAll()
   180  			return err
   181  		}
   183  		// start serving
   184  		go func(hcI *hcInstance, listener net.Listener, httpSrv httpServer) {
   185  			// Serve() will exit and return ErrServerClosed when the http server is closed.
   186  			klog.V(3).InfoS("Starting goroutine for healthcheck", "service", hcI.nsn, "address", listener.Addr())
   187  			if err := httpSrv.Serve(listener); err != nil && err != http.ErrServerClosed {
   188  				klog.ErrorS(err, "Healthcheck closed", "service", hcI.nsn)
   189  				return
   190  			}
   191  			klog.V(3).InfoS("Healthcheck closed", "service", hcI.nsn, "address", listener.Addr())
   192  		}(hcI, listener, httpSrv)
   194  		hcI.httpServers = append(hcI.httpServers, httpSrv)
   195  	}
   197  	return nil
   198  }
   200  func (hcI *hcInstance) closeAll() error {
   201  	errors := []error{}
   202  	for _, server := range hcI.httpServers {
   203  		if err := server.Close(); err != nil {
   204  			klog.ErrorS(err, "Error closing server for health check service", "service", hcI.nsn)
   205  			errors = append(errors, err)
   206  		}
   207  	}
   209  	if len(errors) > 0 {
   210  		return utilerrors.NewAggregate(errors)
   211  	}
   213  	return nil
   214  }
   216  type hcHandler struct {
   217  	name types.NamespacedName
   218  	hcs  *server
   219  }
   221  var _ http.Handler = hcHandler{}
   223  func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
   224  	h.hcs.lock.RLock()
   225  	svc, ok := h.hcs.services[h.name]
   226  	if !ok || svc == nil {
   227  		h.hcs.lock.RUnlock()
   228  		klog.ErrorS(nil, "Received request for closed healthcheck", "service", h.name)
   229  		return
   230  	}
   231  	count := svc.endpoints
   232  	h.hcs.lock.RUnlock()
   233  	kubeProxyHealthy := h.hcs.healthzServer.IsHealthy()
   235  	resp.Header().Set("Content-Type", "application/json")
   236  	resp.Header().Set("X-Content-Type-Options", "nosniff")
   237  	resp.Header().Set("X-Load-Balancing-Endpoint-Weight", strconv.Itoa(count))
   239  	if count != 0 && kubeProxyHealthy {
   240  		resp.WriteHeader(http.StatusOK)
   241  	} else {
   242  		resp.WriteHeader(http.StatusServiceUnavailable)
   243  	}
   244  	fmt.Fprint(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
   245  		{
   246  			"service": {
   247  				"namespace": %q,
   248  				"name": %q
   249  			},
   250  			"localEndpoints": %d,
   251  			"serviceProxyHealthy": %v
   252  		}
   253  		`, h.name.Namespace, h.name.Name, count, kubeProxyHealthy)), "\n"))
   254  }
   256  func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
   257  	hcs.lock.Lock()
   258  	defer hcs.lock.Unlock()
   260  	for nsn, count := range newEndpoints {
   261  		if hcs.services[nsn] == nil {
   262  			continue
   263  		}
   264  		klog.V(3).InfoS("Reporting endpoints for healthcheck", "endpointCount", count, "service", nsn)
   265  		hcs.services[nsn].endpoints = count
   266  	}
   267  	for nsn, hci := range hcs.services {
   268  		if _, found := newEndpoints[nsn]; !found {
   269  			hci.endpoints = 0
   270  		}
   271  	}
   272  	return nil
   273  }
   275  // FakeServiceHealthServer is a fake ServiceHealthServer for test programs
   276  type FakeServiceHealthServer struct{}
   278  // NewFakeServiceHealthServer allocates a new fake service healthcheck server manager
   279  func NewFakeServiceHealthServer() ServiceHealthServer {
   280  	return FakeServiceHealthServer{}
   281  }
   283  // SyncServices is part of ServiceHealthServer
   284  func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error {
   285  	return nil
   286  }
   288  // SyncEndpoints is part of ServiceHealthServer
   289  func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error {
   290  	return nil
   291  }

View as plain text