...

Source file src/k8s.io/kubernetes/pkg/proxy/healthcheck/healthcheck_test.go

Documentation: k8s.io/kubernetes/pkg/proxy/healthcheck

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package healthcheck
    18  
    19  import (
    20  	"encoding/json"
    21  	"net"
    22  	"net/http"
    23  	"net/http/httptest"
    24  	"strconv"
    25  	"testing"
    26  	"time"
    27  
    28  	"github.com/google/go-cmp/cmp"
    29  	v1 "k8s.io/api/core/v1"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/component-base/metrics/testutil"
    32  
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/dump"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  
    37  	basemetrics "k8s.io/component-base/metrics"
    38  	"k8s.io/kubernetes/pkg/proxy/metrics"
    39  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    40  	testingclock "k8s.io/utils/clock/testing"
    41  )
    42  
    43  type fakeListener struct {
    44  	openPorts sets.Set[string]
    45  }
    46  
    47  func newFakeListener() *fakeListener {
    48  	return &fakeListener{
    49  		openPorts: sets.Set[string]{},
    50  	}
    51  }
    52  
    53  func (fake *fakeListener) hasPort(addr string) bool {
    54  	return fake.openPorts.Has(addr)
    55  }
    56  
    57  func (fake *fakeListener) Listen(addr string) (net.Listener, error) {
    58  	fake.openPorts.Insert(addr)
    59  	return &fakeNetListener{
    60  		parent: fake,
    61  		addr:   addr,
    62  	}, nil
    63  }
    64  
    65  type fakeNetListener struct {
    66  	parent *fakeListener
    67  	addr   string
    68  }
    69  
    70  type fakeAddr struct {
    71  }
    72  
    73  func (fa fakeAddr) Network() string {
    74  	return "tcp"
    75  }
    76  func (fa fakeAddr) String() string {
    77  	return "<test>"
    78  }
    79  func (fake *fakeNetListener) Accept() (net.Conn, error) {
    80  	// Not implemented
    81  	return nil, nil
    82  }
    83  
    84  func (fake *fakeNetListener) Close() error {
    85  	fake.parent.openPorts.Delete(fake.addr)
    86  	return nil
    87  }
    88  
    89  func (fake *fakeNetListener) Addr() net.Addr {
    90  	// Not implemented
    91  	return fakeAddr{}
    92  }
    93  
    94  type fakeHTTPServerFactory struct{}
    95  
    96  func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
    97  	return &fakeHTTPServerFactory{}
    98  }
    99  
   100  func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
   101  	return &fakeHTTPServer{
   102  		addr:    addr,
   103  		handler: handler,
   104  	}
   105  }
   106  
   107  type fakeHTTPServer struct {
   108  	addr    string
   109  	handler http.Handler
   110  }
   111  
   112  func (fake *fakeHTTPServer) Serve(listener net.Listener) error {
   113  	return nil // Cause the goroutine to return
   114  }
   115  
   116  func (fake *fakeHTTPServer) Close() error {
   117  	return nil
   118  }
   119  
   120  func mknsn(ns, name string) types.NamespacedName {
   121  	return types.NamespacedName{
   122  		Namespace: ns,
   123  		Name:      name,
   124  	}
   125  }
   126  
   127  type hcPayload struct {
   128  	Service struct {
   129  		Namespace string
   130  		Name      string
   131  	}
   132  	LocalEndpoints      int
   133  	ServiceProxyHealthy bool
   134  }
   135  
   136  type healthzPayload struct {
   137  	LastUpdated string
   138  	CurrentTime string
   139  	NodeHealthy bool
   140  }
   141  
   142  type fakeProxierHealthChecker struct {
   143  	healthy bool
   144  }
   145  
   146  func (fake fakeProxierHealthChecker) IsHealthy() bool {
   147  	return fake.healthy
   148  }
   149  
   150  func TestServer(t *testing.T) {
   151  	listener := newFakeListener()
   152  	httpFactory := newFakeHTTPServerFactory()
   153  	nodePortAddresses := proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{}, nil)
   154  	proxyChecker := &fakeProxierHealthChecker{true}
   155  
   156  	hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
   157  	hcs := hcsi.(*server)
   158  	if len(hcs.services) != 0 {
   159  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   160  	}
   161  
   162  	// sync nothing
   163  	hcs.SyncServices(nil)
   164  	if len(hcs.services) != 0 {
   165  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   166  	}
   167  	hcs.SyncEndpoints(nil)
   168  	if len(hcs.services) != 0 {
   169  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   170  	}
   171  
   172  	// sync unknown endpoints, should be dropped
   173  	hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93})
   174  	if len(hcs.services) != 0 {
   175  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   176  	}
   177  
   178  	// sync a real service
   179  	nsn := mknsn("a", "b")
   180  	hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376})
   181  	if len(hcs.services) != 1 {
   182  		t.Errorf("expected 1 service, got %d", len(hcs.services))
   183  	}
   184  	if hcs.services[nsn].endpoints != 0 {
   185  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
   186  	}
   187  	if len(listener.openPorts) != 1 {
   188  		t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), dump.Pretty(listener.openPorts))
   189  	}
   190  	if !listener.hasPort("0.0.0.0:9376") {
   191  		t.Errorf("expected port :9376 to be open\n%s", dump.Pretty(listener.openPorts))
   192  	}
   193  	// test the handler
   194  	testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
   195  
   196  	// sync an endpoint
   197  	hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18})
   198  	if len(hcs.services) != 1 {
   199  		t.Errorf("expected 1 service, got %d", len(hcs.services))
   200  	}
   201  	if hcs.services[nsn].endpoints != 18 {
   202  		t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints)
   203  	}
   204  	// test the handler
   205  	testHandler(hcs, nsn, http.StatusOK, 18, t)
   206  
   207  	// sync zero endpoints
   208  	hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 0})
   209  	if len(hcs.services) != 1 {
   210  		t.Errorf("expected 1 service, got %d", len(hcs.services))
   211  	}
   212  	if hcs.services[nsn].endpoints != 0 {
   213  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
   214  	}
   215  	// test the handler
   216  	testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
   217  
   218  	// put the endpoint back
   219  	hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 11})
   220  	if len(hcs.services) != 1 {
   221  		t.Errorf("expected 1 service, got %d", len(hcs.services))
   222  	}
   223  	if hcs.services[nsn].endpoints != 11 {
   224  		t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints)
   225  	}
   226  	// sync nil endpoints
   227  	hcs.SyncEndpoints(nil)
   228  	if len(hcs.services) != 1 {
   229  		t.Errorf("expected 1 service, got %d", len(hcs.services))
   230  	}
   231  	if hcs.services[nsn].endpoints != 0 {
   232  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
   233  	}
   234  	// test the handler
   235  	testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
   236  
   237  	// put the endpoint back
   238  	hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18})
   239  	if len(hcs.services) != 1 {
   240  		t.Errorf("expected 1 service, got %d", len(hcs.services))
   241  	}
   242  	if hcs.services[nsn].endpoints != 18 {
   243  		t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints)
   244  	}
   245  	// delete the service
   246  	hcs.SyncServices(nil)
   247  	if len(hcs.services) != 0 {
   248  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   249  	}
   250  
   251  	// sync multiple services
   252  	nsn1 := mknsn("a", "b")
   253  	nsn2 := mknsn("c", "d")
   254  	nsn3 := mknsn("e", "f")
   255  	nsn4 := mknsn("g", "h")
   256  	hcs.SyncServices(map[types.NamespacedName]uint16{
   257  		nsn1: 9376,
   258  		nsn2: 12909,
   259  		nsn3: 11113,
   260  	})
   261  	if len(hcs.services) != 3 {
   262  		t.Errorf("expected 3 service, got %d", len(hcs.services))
   263  	}
   264  	if hcs.services[nsn1].endpoints != 0 {
   265  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn1].endpoints)
   266  	}
   267  	if hcs.services[nsn2].endpoints != 0 {
   268  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn2].endpoints)
   269  	}
   270  	if hcs.services[nsn3].endpoints != 0 {
   271  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints)
   272  	}
   273  	if len(listener.openPorts) != 3 {
   274  		t.Errorf("expected 3 open ports, got %d\n%s", len(listener.openPorts), dump.Pretty(listener.openPorts))
   275  	}
   276  	// test the handlers
   277  	testHandler(hcs, nsn1, http.StatusServiceUnavailable, 0, t)
   278  	testHandler(hcs, nsn2, http.StatusServiceUnavailable, 0, t)
   279  	testHandler(hcs, nsn3, http.StatusServiceUnavailable, 0, t)
   280  
   281  	// sync endpoints
   282  	hcs.SyncEndpoints(map[types.NamespacedName]int{
   283  		nsn1: 9,
   284  		nsn2: 3,
   285  		nsn3: 7,
   286  	})
   287  	if len(hcs.services) != 3 {
   288  		t.Errorf("expected 3 services, got %d", len(hcs.services))
   289  	}
   290  	if hcs.services[nsn1].endpoints != 9 {
   291  		t.Errorf("expected 9 endpoints, got %d", hcs.services[nsn1].endpoints)
   292  	}
   293  	if hcs.services[nsn2].endpoints != 3 {
   294  		t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints)
   295  	}
   296  	if hcs.services[nsn3].endpoints != 7 {
   297  		t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints)
   298  	}
   299  	// test the handlers
   300  	testHandler(hcs, nsn1, http.StatusOK, 9, t)
   301  	testHandler(hcs, nsn2, http.StatusOK, 3, t)
   302  	testHandler(hcs, nsn3, http.StatusOK, 7, t)
   303  
   304  	// sync new services
   305  	hcs.SyncServices(map[types.NamespacedName]uint16{
   306  		//nsn1: 9376, // remove it
   307  		nsn2: 12909, // leave it
   308  		nsn3: 11114, // change it
   309  		nsn4: 11878, // add it
   310  	})
   311  	if len(hcs.services) != 3 {
   312  		t.Errorf("expected 3 service, got %d", len(hcs.services))
   313  	}
   314  	if hcs.services[nsn2].endpoints != 3 {
   315  		t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints)
   316  	}
   317  	if hcs.services[nsn3].endpoints != 0 {
   318  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints)
   319  	}
   320  	if hcs.services[nsn4].endpoints != 0 {
   321  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn4].endpoints)
   322  	}
   323  	// test the handlers
   324  	testHandler(hcs, nsn2, http.StatusOK, 3, t)
   325  	testHandler(hcs, nsn3, http.StatusServiceUnavailable, 0, t)
   326  	testHandler(hcs, nsn4, http.StatusServiceUnavailable, 0, t)
   327  
   328  	// sync endpoints
   329  	hcs.SyncEndpoints(map[types.NamespacedName]int{
   330  		nsn1: 9,
   331  		nsn2: 3,
   332  		nsn3: 7,
   333  		nsn4: 6,
   334  	})
   335  	if len(hcs.services) != 3 {
   336  		t.Errorf("expected 3 services, got %d", len(hcs.services))
   337  	}
   338  	if hcs.services[nsn2].endpoints != 3 {
   339  		t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints)
   340  	}
   341  	if hcs.services[nsn3].endpoints != 7 {
   342  		t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints)
   343  	}
   344  	if hcs.services[nsn4].endpoints != 6 {
   345  		t.Errorf("expected 6 endpoints, got %d", hcs.services[nsn4].endpoints)
   346  	}
   347  	// test the handlers
   348  	testHandler(hcs, nsn2, http.StatusOK, 3, t)
   349  	testHandler(hcs, nsn3, http.StatusOK, 7, t)
   350  	testHandler(hcs, nsn4, http.StatusOK, 6, t)
   351  
   352  	// sync endpoints, missing nsn2
   353  	hcs.SyncEndpoints(map[types.NamespacedName]int{
   354  		nsn3: 7,
   355  		nsn4: 6,
   356  	})
   357  	if len(hcs.services) != 3 {
   358  		t.Errorf("expected 3 services, got %d", len(hcs.services))
   359  	}
   360  	if hcs.services[nsn2].endpoints != 0 {
   361  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn2].endpoints)
   362  	}
   363  	if hcs.services[nsn3].endpoints != 7 {
   364  		t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints)
   365  	}
   366  	if hcs.services[nsn4].endpoints != 6 {
   367  		t.Errorf("expected 6 endpoints, got %d", hcs.services[nsn4].endpoints)
   368  	}
   369  	// test the handlers
   370  	testHandler(hcs, nsn2, http.StatusServiceUnavailable, 0, t)
   371  	testHandler(hcs, nsn3, http.StatusOK, 7, t)
   372  	testHandler(hcs, nsn4, http.StatusOK, 6, t)
   373  
   374  	// fake a temporary unhealthy proxy
   375  	proxyChecker.healthy = false
   376  	testHandlerWithHealth(hcs, nsn2, http.StatusServiceUnavailable, 0, false, t)
   377  	testHandlerWithHealth(hcs, nsn3, http.StatusServiceUnavailable, 7, false, t)
   378  	testHandlerWithHealth(hcs, nsn4, http.StatusServiceUnavailable, 6, false, t)
   379  
   380  	// fake a healthy proxy
   381  	proxyChecker.healthy = true
   382  	testHandlerWithHealth(hcs, nsn2, http.StatusServiceUnavailable, 0, true, t)
   383  	testHandlerWithHealth(hcs, nsn3, http.StatusOK, 7, true, t)
   384  	testHandlerWithHealth(hcs, nsn4, http.StatusOK, 6, true, t)
   385  }
   386  
   387  func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) {
   388  	tHandler(hcs, nsn, status, endpoints, true, t)
   389  }
   390  
   391  func testHandlerWithHealth(hcs *server, nsn types.NamespacedName, status int, endpoints int, kubeProxyHealthy bool, t *testing.T) {
   392  	tHandler(hcs, nsn, status, endpoints, kubeProxyHealthy, t)
   393  }
   394  
   395  func tHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, kubeProxyHealthy bool, t *testing.T) {
   396  	instance := hcs.services[nsn]
   397  	for _, h := range instance.httpServers {
   398  		handler := h.(*fakeHTTPServer).handler
   399  
   400  		req, err := http.NewRequest("GET", "/healthz", nil)
   401  		if err != nil {
   402  			t.Fatal(err)
   403  		}
   404  		resp := httptest.NewRecorder()
   405  
   406  		handler.ServeHTTP(resp, req)
   407  
   408  		if resp.Code != status {
   409  			t.Errorf("expected status code %v, got %v", status, resp.Code)
   410  		}
   411  		var payload hcPayload
   412  		if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
   413  			t.Fatal(err)
   414  		}
   415  		if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace {
   416  			t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service)
   417  		}
   418  		if payload.LocalEndpoints != endpoints {
   419  			t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
   420  		}
   421  		if payload.ServiceProxyHealthy != kubeProxyHealthy {
   422  			t.Errorf("expected %v kubeProxyHealthy, got %v", kubeProxyHealthy, payload.ServiceProxyHealthy)
   423  		}
   424  		if !cmp.Equal(resp.Header()["Content-Type"], []string{"application/json"}) {
   425  			t.Errorf("expected 'Content-Type: application/json' respose header, got: %v", resp.Header()["Content-Type"])
   426  		}
   427  		if !cmp.Equal(resp.Header()["X-Content-Type-Options"], []string{"nosniff"}) {
   428  			t.Errorf("expected 'X-Content-Type-Options: nosniff' respose header, got: %v", resp.Header()["X-Content-Type-Options"])
   429  		}
   430  		if !cmp.Equal(resp.Header()["X-Load-Balancing-Endpoint-Weight"], []string{strconv.Itoa(endpoints)}) {
   431  			t.Errorf("expected 'X-Load-Balancing-Endpoint-Weight: %d' respose header, got: %v", endpoints, resp.Header()["X-Load-Balancing-Endpoint-Weight"])
   432  		}
   433  	}
   434  }
   435  
   436  type nodeTweak func(n *v1.Node)
   437  
   438  func makeNode(tweaks ...nodeTweak) *v1.Node {
   439  	n := &v1.Node{}
   440  	for _, tw := range tweaks {
   441  		tw(n)
   442  	}
   443  	return n
   444  }
   445  
   446  func tweakDeleted() nodeTweak {
   447  	return func(n *v1.Node) {
   448  		n.DeletionTimestamp = &metav1.Time{
   449  			Time: time.Now(),
   450  		}
   451  	}
   452  }
   453  
   454  func tweakTainted(key string) nodeTweak {
   455  	return func(n *v1.Node) {
   456  		n.Spec.Taints = append(n.Spec.Taints, v1.Taint{Key: key})
   457  	}
   458  }
   459  
   460  type serverTest struct {
   461  	server      httpServer
   462  	url         url
   463  	tracking200 int
   464  	tracking503 int
   465  }
   466  
   467  func TestHealthzServer(t *testing.T) {
   468  	metrics.RegisterMetrics()
   469  	listener := newFakeListener()
   470  	httpFactory := newFakeHTTPServerFactory()
   471  	fakeClock := testingclock.NewFakeClock(time.Now())
   472  
   473  	hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
   474  	server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
   475  
   476  	hsTest := &serverTest{
   477  		server:      server,
   478  		url:         healthzURL,
   479  		tracking200: 0,
   480  		tracking503: 0,
   481  	}
   482  
   483  	testProxierHealthUpdater(hs, hsTest, fakeClock, t)
   484  
   485  	// Should return 200 "OK" if we've synced a node, tainted in any other way
   486  	hs.SyncNode(makeNode(tweakTainted("other")))
   487  	testHTTPHandler(hsTest, http.StatusOK, t)
   488  
   489  	// Should return 503 "ServiceUnavailable" if we've synced a ToBeDeletedTaint node
   490  	hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
   491  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   492  
   493  	// Should return 200 "OK" if we've synced a node, tainted in any other way
   494  	hs.SyncNode(makeNode(tweakTainted("other")))
   495  	testHTTPHandler(hsTest, http.StatusOK, t)
   496  
   497  	// Should return 503 "ServiceUnavailable" if we've synced a deleted node
   498  	hs.SyncNode(makeNode(tweakDeleted()))
   499  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   500  }
   501  
   502  func TestLivezServer(t *testing.T) {
   503  	metrics.RegisterMetrics()
   504  	listener := newFakeListener()
   505  	httpFactory := newFakeHTTPServerFactory()
   506  	fakeClock := testingclock.NewFakeClock(time.Now())
   507  
   508  	hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
   509  	server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
   510  
   511  	hsTest := &serverTest{
   512  		server:      server,
   513  		url:         livezURL,
   514  		tracking200: 0,
   515  		tracking503: 0,
   516  	}
   517  
   518  	testProxierHealthUpdater(hs, hsTest, fakeClock, t)
   519  
   520  	// Should return 200 "OK" irrespective of node syncs
   521  	hs.SyncNode(makeNode(tweakTainted("other")))
   522  	testHTTPHandler(hsTest, http.StatusOK, t)
   523  
   524  	// Should return 200 "OK" irrespective of node syncs
   525  	hs.SyncNode(makeNode(tweakTainted(ToBeDeletedTaint)))
   526  	testHTTPHandler(hsTest, http.StatusOK, t)
   527  
   528  	// Should return 200 "OK" irrespective of node syncs
   529  	hs.SyncNode(makeNode(tweakTainted("other")))
   530  	testHTTPHandler(hsTest, http.StatusOK, t)
   531  
   532  	// Should return 200 "OK" irrespective of node syncs
   533  	hs.SyncNode(makeNode(tweakDeleted()))
   534  	testHTTPHandler(hsTest, http.StatusOK, t)
   535  }
   536  
   537  type url string
   538  
   539  var (
   540  	healthzURL url = "/healthz"
   541  	livezURL   url = "/livez"
   542  )
   543  
   544  func testProxierHealthUpdater(hs *ProxierHealthServer, hsTest *serverTest, fakeClock *testingclock.FakeClock, t *testing.T) {
   545  	// Should return 200 "OK" by default.
   546  	testHTTPHandler(hsTest, http.StatusOK, t)
   547  
   548  	// Should return 200 "OK" after first update for both IPv4 and IPv6 proxiers.
   549  	hs.Updated(v1.IPv4Protocol)
   550  	hs.Updated(v1.IPv6Protocol)
   551  	testHTTPHandler(hsTest, http.StatusOK, t)
   552  
   553  	// Should continue to return 200 "OK" as long as no further updates are queued for any proxier.
   554  	fakeClock.Step(25 * time.Second)
   555  	testHTTPHandler(hsTest, http.StatusOK, t)
   556  
   557  	// Should return 503 "ServiceUnavailable" if IPv4 proxier exceed max update-processing time.
   558  	hs.QueuedUpdate(v1.IPv4Protocol)
   559  	fakeClock.Step(25 * time.Second)
   560  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   561  
   562  	// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
   563  	hs.Updated(v1.IPv4Protocol)
   564  	hs.Updated(v1.IPv6Protocol)
   565  	fakeClock.Step(5 * time.Second)
   566  	testHTTPHandler(hsTest, http.StatusOK, t)
   567  
   568  	// Should return 503 "ServiceUnavailable" if IPv6 proxier exceed max update-processing time.
   569  	hs.QueuedUpdate(v1.IPv6Protocol)
   570  	fakeClock.Step(25 * time.Second)
   571  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   572  
   573  	// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
   574  	hs.Updated(v1.IPv4Protocol)
   575  	hs.Updated(v1.IPv6Protocol)
   576  	fakeClock.Step(5 * time.Second)
   577  	testHTTPHandler(hsTest, http.StatusOK, t)
   578  
   579  	// Should return 503 "ServiceUnavailable" if both IPv4 and IPv6 proxiers exceed max update-processing time.
   580  	hs.QueuedUpdate(v1.IPv4Protocol)
   581  	hs.QueuedUpdate(v1.IPv6Protocol)
   582  	fakeClock.Step(25 * time.Second)
   583  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   584  
   585  	// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
   586  	hs.Updated(v1.IPv4Protocol)
   587  	hs.Updated(v1.IPv6Protocol)
   588  	fakeClock.Step(5 * time.Second)
   589  	testHTTPHandler(hsTest, http.StatusOK, t)
   590  
   591  	// If IPv6 proxier is late for an update but IPv4 proxier is not then updating IPv4 proxier should have no effect.
   592  	hs.QueuedUpdate(v1.IPv6Protocol)
   593  	fakeClock.Step(25 * time.Second)
   594  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   595  
   596  	hs.Updated(v1.IPv4Protocol)
   597  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   598  
   599  	hs.Updated(v1.IPv6Protocol)
   600  	testHTTPHandler(hsTest, http.StatusOK, t)
   601  
   602  	// If both IPv4 and IPv6 proxiers are late for an update, we shouldn't report 200 "OK" until after both of them update.
   603  	hs.QueuedUpdate(v1.IPv4Protocol)
   604  	hs.QueuedUpdate(v1.IPv6Protocol)
   605  	fakeClock.Step(25 * time.Second)
   606  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   607  
   608  	hs.Updated(v1.IPv4Protocol)
   609  	testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
   610  
   611  	hs.Updated(v1.IPv6Protocol)
   612  	testHTTPHandler(hsTest, http.StatusOK, t)
   613  }
   614  
   615  func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) {
   616  	handler := hsTest.server.(*fakeHTTPServer).handler
   617  	req, err := http.NewRequest("GET", string(hsTest.url), nil)
   618  	if err != nil {
   619  		t.Fatal(err)
   620  	}
   621  	resp := httptest.NewRecorder()
   622  
   623  	handler.ServeHTTP(resp, req)
   624  
   625  	if resp.Code != status {
   626  		t.Errorf("expected status code %v, got %v", status, resp.Code)
   627  	}
   628  	var payload healthzPayload
   629  	if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
   630  		t.Fatal(err)
   631  	}
   632  
   633  	if status == http.StatusOK {
   634  		hsTest.tracking200++
   635  	}
   636  	if status == http.StatusServiceUnavailable {
   637  		hsTest.tracking503++
   638  	}
   639  	if hsTest.url == healthzURL {
   640  		testMetricEquals(metrics.ProxyHealthzTotal.WithLabelValues("200"), float64(hsTest.tracking200), t)
   641  		testMetricEquals(metrics.ProxyHealthzTotal.WithLabelValues("503"), float64(hsTest.tracking503), t)
   642  	}
   643  	if hsTest.url == livezURL {
   644  		testMetricEquals(metrics.ProxyLivezTotal.WithLabelValues("200"), float64(hsTest.tracking200), t)
   645  		testMetricEquals(metrics.ProxyLivezTotal.WithLabelValues("503"), float64(hsTest.tracking503), t)
   646  	}
   647  }
   648  
   649  func testMetricEquals(metric basemetrics.CounterMetric, expected float64, t *testing.T) {
   650  	t.Helper()
   651  	val, err := testutil.GetCounterMetricValue(metric)
   652  	if err != nil {
   653  		t.Errorf("unable to retrieve value for metric, err: %v", err)
   654  	}
   655  	if val != expected {
   656  		t.Errorf("expected: %v, found: %v", expected, val)
   657  	}
   658  }
   659  
   660  func TestServerWithSelectiveListeningAddress(t *testing.T) {
   661  	listener := newFakeListener()
   662  	httpFactory := newFakeHTTPServerFactory()
   663  	proxyChecker := &fakeProxierHealthChecker{true}
   664  
   665  	// limiting addresses to loop back. We don't want any cleverness here around getting IP for
   666  	// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
   667  	nodePortAddresses := proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"127.0.0.0/8"}, nil)
   668  
   669  	hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
   670  	hcs := hcsi.(*server)
   671  	if len(hcs.services) != 0 {
   672  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   673  	}
   674  
   675  	// sync nothing
   676  	hcs.SyncServices(nil)
   677  	if len(hcs.services) != 0 {
   678  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   679  	}
   680  	hcs.SyncEndpoints(nil)
   681  	if len(hcs.services) != 0 {
   682  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   683  	}
   684  
   685  	// sync unknown endpoints, should be dropped
   686  	hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93})
   687  	if len(hcs.services) != 0 {
   688  		t.Errorf("expected 0 services, got %d", len(hcs.services))
   689  	}
   690  
   691  	// sync a real service
   692  	nsn := mknsn("a", "b")
   693  	hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376})
   694  	if len(hcs.services) != 1 {
   695  		t.Errorf("expected 1 service, got %d", len(hcs.services))
   696  	}
   697  	if hcs.services[nsn].endpoints != 0 {
   698  		t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
   699  	}
   700  	if len(listener.openPorts) != 1 {
   701  		t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), dump.Pretty(listener.openPorts))
   702  	}
   703  	if !listener.hasPort("127.0.0.1:9376") {
   704  		t.Errorf("expected port :9376 to be open\n%s", dump.Pretty(listener.openPorts))
   705  	}
   706  	// test the handler
   707  	testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
   708  }
   709  

View as plain text