...

Source file src/k8s.io/kubernetes/pkg/proxy/healthcheck/service_health.go

Documentation: k8s.io/kubernetes/pkg/proxy/healthcheck

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package healthcheck
    18  
    19  import (
    20  	"fmt"
    21  	"net"
    22  	"net/http"
    23  	"strconv"
    24  	"strings"
    25  	"sync"
    26  
    27  	"github.com/lithammer/dedent"
    28  
    29  	v1 "k8s.io/api/core/v1"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    32  	"k8s.io/client-go/tools/events"
    33  	"k8s.io/klog/v2"
    34  	api "k8s.io/kubernetes/pkg/apis/core"
    35  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    36  )
    37  
    38  // ServiceHealthServer serves HTTP endpoints for each service name, with results
    39  // based on the endpoints.  If there are 0 endpoints for a service, it returns a
    40  // 503 "Service Unavailable" error (telling LBs not to use this node).  If there
    41  // are 1 or more endpoints, it returns a 200 "OK".
    42  type ServiceHealthServer interface {
    43  	// Make the new set of services be active.  Services that were open before
    44  	// will be closed.  Services that are new will be opened.  Service that
    45  	// existed and are in the new set will be left alone.  The value of the map
    46  	// is the healthcheck-port to listen on.
    47  	SyncServices(newServices map[types.NamespacedName]uint16) error
    48  	// Make the new set of endpoints be active.  Endpoints for services that do
    49  	// not exist will be dropped.  The value of the map is the number of
    50  	// endpoints the service has on this node.
    51  	SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
    52  }
    53  
    54  type proxierHealthChecker interface {
    55  	// IsHealthy returns the proxier's health state, following the same
    56  	// definition the HTTP server defines.
    57  	IsHealthy() bool
    58  }
    59  
    60  func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
    61  	// It doesn't matter whether we listen on "0.0.0.0", "::", or ""; go
    62  	// treats them all the same.
    63  	nodeIPs := []net.IP{net.IPv4zero}
    64  
    65  	if !nodePortAddresses.MatchAll() {
    66  		ips, err := nodePortAddresses.GetNodeIPs(proxyutil.RealNetwork{})
    67  		if err == nil {
    68  			nodeIPs = ips
    69  		} else {
    70  			klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodePortAddresses", nodePortAddresses)
    71  		}
    72  	}
    73  
    74  	return &server{
    75  		hostname:      hostname,
    76  		recorder:      recorder,
    77  		listener:      listener,
    78  		httpFactory:   factory,
    79  		healthzServer: healthzServer,
    80  		services:      map[types.NamespacedName]*hcInstance{},
    81  		nodeIPs:       nodeIPs,
    82  	}
    83  }
    84  
    85  // NewServiceHealthServer allocates a new service healthcheck server manager
    86  func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
    87  	return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer)
    88  }
    89  
    90  type server struct {
    91  	hostname string
    92  	// node addresses where health check port will listen on
    93  	nodeIPs     []net.IP
    94  	recorder    events.EventRecorder // can be nil
    95  	listener    listener
    96  	httpFactory httpServerFactory
    97  
    98  	healthzServer proxierHealthChecker
    99  
   100  	lock     sync.RWMutex
   101  	services map[types.NamespacedName]*hcInstance
   102  }
   103  
   104  func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
   105  	hcs.lock.Lock()
   106  	defer hcs.lock.Unlock()
   107  
   108  	// Remove any that are not needed any more.
   109  	for nsn, svc := range hcs.services {
   110  		if port, found := newServices[nsn]; !found || port != svc.port {
   111  			klog.V(2).InfoS("Closing healthcheck", "service", nsn, "port", svc.port)
   112  
   113  			// errors are loged in closeAll()
   114  			_ = svc.closeAll()
   115  
   116  			delete(hcs.services, nsn)
   117  
   118  		}
   119  	}
   120  
   121  	// Add any that are needed.
   122  	for nsn, port := range newServices {
   123  		if hcs.services[nsn] != nil {
   124  			klog.V(3).InfoS("Existing healthcheck", "service", nsn, "port", port)
   125  			continue
   126  		}
   127  
   128  		klog.V(2).InfoS("Opening healthcheck", "service", nsn, "port", port)
   129  
   130  		svc := &hcInstance{nsn: nsn, port: port}
   131  		err := svc.listenAndServeAll(hcs)
   132  
   133  		if err != nil {
   134  			msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
   135  
   136  			if hcs.recorder != nil {
   137  				hcs.recorder.Eventf(
   138  					&v1.ObjectReference{
   139  						Kind:      "Service",
   140  						Namespace: nsn.Namespace,
   141  						Name:      nsn.Name,
   142  						UID:       types.UID(nsn.String()),
   143  					}, nil, api.EventTypeWarning, "FailedToStartServiceHealthcheck", "Listen", msg)
   144  			}
   145  			klog.ErrorS(err, "Failed to start healthcheck", "node", hcs.hostname, "service", nsn, "port", port)
   146  			continue
   147  		}
   148  		hcs.services[nsn] = svc
   149  	}
   150  	return nil
   151  }
   152  
   153  type hcInstance struct {
   154  	nsn  types.NamespacedName
   155  	port uint16
   156  
   157  	httpServers []httpServer
   158  
   159  	endpoints int // number of local endpoints for a service
   160  }
   161  
   162  // listenAll opens health check port on all the addresses provided
   163  func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
   164  	var err error
   165  	var listener net.Listener
   166  
   167  	hcI.httpServers = make([]httpServer, 0, len(hcs.nodeIPs))
   168  
   169  	// for each of the node addresses start listening and serving
   170  	for _, ip := range hcs.nodeIPs {
   171  		addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port))
   172  		// create http server
   173  		httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
   174  		// start listener
   175  		listener, err = hcs.listener.Listen(addr)
   176  		if err != nil {
   177  			// must close whatever have been previously opened
   178  			// to allow a retry/or port ownership change as needed
   179  			_ = hcI.closeAll()
   180  			return err
   181  		}
   182  
   183  		// start serving
   184  		go func(hcI *hcInstance, listener net.Listener, httpSrv httpServer) {
   185  			// Serve() will exit and return ErrServerClosed when the http server is closed.
   186  			klog.V(3).InfoS("Starting goroutine for healthcheck", "service", hcI.nsn, "address", listener.Addr())
   187  			if err := httpSrv.Serve(listener); err != nil && err != http.ErrServerClosed {
   188  				klog.ErrorS(err, "Healthcheck closed", "service", hcI.nsn)
   189  				return
   190  			}
   191  			klog.V(3).InfoS("Healthcheck closed", "service", hcI.nsn, "address", listener.Addr())
   192  		}(hcI, listener, httpSrv)
   193  
   194  		hcI.httpServers = append(hcI.httpServers, httpSrv)
   195  	}
   196  
   197  	return nil
   198  }
   199  
   200  func (hcI *hcInstance) closeAll() error {
   201  	errors := []error{}
   202  	for _, server := range hcI.httpServers {
   203  		if err := server.Close(); err != nil {
   204  			klog.ErrorS(err, "Error closing server for health check service", "service", hcI.nsn)
   205  			errors = append(errors, err)
   206  		}
   207  	}
   208  
   209  	if len(errors) > 0 {
   210  		return utilerrors.NewAggregate(errors)
   211  	}
   212  
   213  	return nil
   214  }
   215  
   216  type hcHandler struct {
   217  	name types.NamespacedName
   218  	hcs  *server
   219  }
   220  
   221  var _ http.Handler = hcHandler{}
   222  
   223  func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
   224  	h.hcs.lock.RLock()
   225  	svc, ok := h.hcs.services[h.name]
   226  	if !ok || svc == nil {
   227  		h.hcs.lock.RUnlock()
   228  		klog.ErrorS(nil, "Received request for closed healthcheck", "service", h.name)
   229  		return
   230  	}
   231  	count := svc.endpoints
   232  	h.hcs.lock.RUnlock()
   233  	kubeProxyHealthy := h.hcs.healthzServer.IsHealthy()
   234  
   235  	resp.Header().Set("Content-Type", "application/json")
   236  	resp.Header().Set("X-Content-Type-Options", "nosniff")
   237  	resp.Header().Set("X-Load-Balancing-Endpoint-Weight", strconv.Itoa(count))
   238  
   239  	if count != 0 && kubeProxyHealthy {
   240  		resp.WriteHeader(http.StatusOK)
   241  	} else {
   242  		resp.WriteHeader(http.StatusServiceUnavailable)
   243  	}
   244  	fmt.Fprint(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
   245  		{
   246  			"service": {
   247  				"namespace": %q,
   248  				"name": %q
   249  			},
   250  			"localEndpoints": %d,
   251  			"serviceProxyHealthy": %v
   252  		}
   253  		`, h.name.Namespace, h.name.Name, count, kubeProxyHealthy)), "\n"))
   254  }
   255  
   256  func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
   257  	hcs.lock.Lock()
   258  	defer hcs.lock.Unlock()
   259  
   260  	for nsn, count := range newEndpoints {
   261  		if hcs.services[nsn] == nil {
   262  			continue
   263  		}
   264  		klog.V(3).InfoS("Reporting endpoints for healthcheck", "endpointCount", count, "service", nsn)
   265  		hcs.services[nsn].endpoints = count
   266  	}
   267  	for nsn, hci := range hcs.services {
   268  		if _, found := newEndpoints[nsn]; !found {
   269  			hci.endpoints = 0
   270  		}
   271  	}
   272  	return nil
   273  }
   274  
   275  // FakeServiceHealthServer is a fake ServiceHealthServer for test programs
   276  type FakeServiceHealthServer struct{}
   277  
   278  // NewFakeServiceHealthServer allocates a new fake service healthcheck server manager
   279  func NewFakeServiceHealthServer() ServiceHealthServer {
   280  	return FakeServiceHealthServer{}
   281  }
   282  
   283  // SyncServices is part of ServiceHealthServer
   284  func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error {
   285  	return nil
   286  }
   287  
   288  // SyncEndpoints is part of ServiceHealthServer
   289  func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error {
   290  	return nil
   291  }
   292  

View as plain text