...

Source file src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go

Documentation: k8s.io/kube-aggregator/pkg/apiserver

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"crypto/x509"
    23  	"fmt"
    24  	"io"
    25  	"net"
    26  	"net/http"
    27  	"net/http/httptest"
    28  	"net/http/httputil"
    29  	"net/url"
    30  	"os"
    31  	"path/filepath"
    32  	"reflect"
    33  	"strings"
    34  	"sync/atomic"
    35  	"testing"
    36  
    37  	"k8s.io/apiserver/pkg/audit"
    38  	"k8s.io/apiserver/pkg/server/dynamiccertificates"
    39  	"k8s.io/client-go/transport"
    40  
    41  	"golang.org/x/net/websocket"
    42  
    43  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    44  	"k8s.io/apimachinery/pkg/types"
    45  	utilnet "k8s.io/apimachinery/pkg/util/net"
    46  	"k8s.io/apimachinery/pkg/util/proxy"
    47  	"k8s.io/apimachinery/pkg/util/sets"
    48  	"k8s.io/apiserver/pkg/authentication/user"
    49  	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    50  	"k8s.io/apiserver/pkg/server/egressselector"
    51  	utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
    52  	apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
    53  	"k8s.io/component-base/metrics"
    54  	"k8s.io/component-base/metrics/legacyregistry"
    55  	apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    56  	"k8s.io/utils/pointer"
    57  )
    58  
    59  type targetHTTPHandler struct {
    60  	called  bool
    61  	headers map[string][]string
    62  	path    string
    63  	host    string
    64  }
    65  
    66  func (d *targetHTTPHandler) Reset() {
    67  	d.path = ""
    68  	d.called = false
    69  	d.headers = nil
    70  	d.host = ""
    71  }
    72  
    73  func (d *targetHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    74  	d.path = r.URL.Path
    75  	d.called = true
    76  	d.headers = r.Header
    77  	d.host = r.Host
    78  	w.WriteHeader(http.StatusOK)
    79  }
    80  
    81  func contextHandler(handler http.Handler, user user.Info) http.Handler {
    82  	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    83  		ctx := req.Context()
    84  		if user != nil {
    85  			ctx = genericapirequest.WithUser(ctx, user)
    86  		}
    87  		resolver := &genericapirequest.RequestInfoFactory{
    88  			APIPrefixes:          sets.NewString("api", "apis"),
    89  			GrouplessAPIPrefixes: sets.NewString("api"),
    90  		}
    91  		info, err := resolver.NewRequestInfo(req)
    92  		if err == nil {
    93  			ctx = genericapirequest.WithRequestInfo(ctx, info)
    94  		}
    95  		req = req.WithContext(ctx)
    96  		handler.ServeHTTP(w, req)
    97  	})
    98  }
    99  
   100  type mockedRouter struct {
   101  	destinationHost string
   102  	err             error
   103  }
   104  
   105  func (r *mockedRouter) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
   106  	return &url.URL{Scheme: "https", Host: r.destinationHost}, r.err
   107  }
   108  
   109  func emptyCert() []byte {
   110  	return []byte{}
   111  }
   112  
   113  func TestProxyHandler(t *testing.T) {
   114  	tests := map[string]struct {
   115  		user       user.Info
   116  		path       string
   117  		apiService *apiregistration.APIService
   118  
   119  		serviceResolver        ServiceResolver
   120  		serviceCertOverride    []byte
   121  		increaseSANWarnCounter bool
   122  
   123  		expectedStatusCode int
   124  		expectedBody       string
   125  		expectedCalled     bool
   126  		expectedHeaders    map[string][]string
   127  	}{
   128  		"no target": {
   129  			expectedStatusCode: http.StatusNotFound,
   130  		},
   131  		"no user": {
   132  			apiService: &apiregistration.APIService{
   133  				ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   134  				Spec: apiregistration.APIServiceSpec{
   135  					Service: &apiregistration.ServiceReference{Port: pointer.Int32Ptr(443)},
   136  					Group:   "foo",
   137  					Version: "v1",
   138  				},
   139  				Status: apiregistration.APIServiceStatus{
   140  					Conditions: []apiregistration.APIServiceCondition{
   141  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   142  					},
   143  				},
   144  			},
   145  			expectedStatusCode: http.StatusInternalServerError,
   146  			expectedBody:       "missing user",
   147  		},
   148  		"proxy with user, insecure": {
   149  			user: &user.DefaultInfo{
   150  				Name:   "username",
   151  				Groups: []string{"one", "two"},
   152  			},
   153  			path: "/request/path",
   154  			apiService: &apiregistration.APIService{
   155  				ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   156  				Spec: apiregistration.APIServiceSpec{
   157  					Service:               &apiregistration.ServiceReference{Port: pointer.Int32Ptr(443)},
   158  					Group:                 "foo",
   159  					Version:               "v1",
   160  					InsecureSkipTLSVerify: true,
   161  				},
   162  				Status: apiregistration.APIServiceStatus{
   163  					Conditions: []apiregistration.APIServiceCondition{
   164  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   165  					},
   166  				},
   167  			},
   168  			expectedStatusCode: http.StatusOK,
   169  			expectedCalled:     true,
   170  			expectedHeaders: map[string][]string{
   171  				"X-Forwarded-Proto": {"https"},
   172  				"X-Forwarded-Uri":   {"/request/path"},
   173  				"X-Forwarded-For":   {"127.0.0.1"},
   174  				"X-Remote-User":     {"username"},
   175  				"User-Agent":        {"Go-http-client/1.1"},
   176  				"Accept-Encoding":   {"gzip"},
   177  				"X-Remote-Group":    {"one", "two"},
   178  			},
   179  		},
   180  		"proxy with user, cabundle": {
   181  			user: &user.DefaultInfo{
   182  				Name:   "username",
   183  				Groups: []string{"one", "two"},
   184  			},
   185  			path: "/request/path",
   186  			apiService: &apiregistration.APIService{
   187  				ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   188  				Spec: apiregistration.APIServiceSpec{
   189  					Service:  &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   190  					Group:    "foo",
   191  					Version:  "v1",
   192  					CABundle: testCACrt,
   193  				},
   194  				Status: apiregistration.APIServiceStatus{
   195  					Conditions: []apiregistration.APIServiceCondition{
   196  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   197  					},
   198  				},
   199  			},
   200  			expectedStatusCode: http.StatusOK,
   201  			expectedCalled:     true,
   202  			expectedHeaders: map[string][]string{
   203  				"X-Forwarded-Proto": {"https"},
   204  				"X-Forwarded-Uri":   {"/request/path"},
   205  				"X-Forwarded-For":   {"127.0.0.1"},
   206  				"X-Remote-User":     {"username"},
   207  				"User-Agent":        {"Go-http-client/1.1"},
   208  				"Accept-Encoding":   {"gzip"},
   209  				"X-Remote-Group":    {"one", "two"},
   210  			},
   211  		},
   212  		"service unavailable": {
   213  			user: &user.DefaultInfo{
   214  				Name:   "username",
   215  				Groups: []string{"one", "two"},
   216  			},
   217  			path: "/request/path",
   218  			apiService: &apiregistration.APIService{
   219  				ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   220  				Spec: apiregistration.APIServiceSpec{
   221  					Service:  &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   222  					Group:    "foo",
   223  					Version:  "v1",
   224  					CABundle: testCACrt,
   225  				},
   226  				Status: apiregistration.APIServiceStatus{
   227  					Conditions: []apiregistration.APIServiceCondition{
   228  						{Type: apiregistration.Available, Status: apiregistration.ConditionFalse},
   229  					},
   230  				},
   231  			},
   232  			expectedStatusCode: http.StatusServiceUnavailable,
   233  		},
   234  		"service unresolveable": {
   235  			user: &user.DefaultInfo{
   236  				Name:   "username",
   237  				Groups: []string{"one", "two"},
   238  			},
   239  			path:            "/request/path",
   240  			serviceResolver: &mockedRouter{err: fmt.Errorf("unresolveable")},
   241  			apiService: &apiregistration.APIService{
   242  				ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   243  				Spec: apiregistration.APIServiceSpec{
   244  					Service:  &apiregistration.ServiceReference{Name: "bad-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   245  					Group:    "foo",
   246  					Version:  "v1",
   247  					CABundle: testCACrt,
   248  				},
   249  				Status: apiregistration.APIServiceStatus{
   250  					Conditions: []apiregistration.APIServiceCondition{
   251  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   252  					},
   253  				},
   254  			},
   255  			expectedStatusCode: http.StatusServiceUnavailable,
   256  		},
   257  		"fail on bad serving cert": {
   258  			user: &user.DefaultInfo{
   259  				Name:   "username",
   260  				Groups: []string{"one", "two"},
   261  			},
   262  			path: "/request/path",
   263  			apiService: &apiregistration.APIService{
   264  				ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   265  				Spec: apiregistration.APIServiceSpec{
   266  					Service: &apiregistration.ServiceReference{Port: pointer.Int32Ptr(443)},
   267  					Group:   "foo",
   268  					Version: "v1",
   269  				},
   270  				Status: apiregistration.APIServiceStatus{
   271  					Conditions: []apiregistration.APIServiceCondition{
   272  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   273  					},
   274  				},
   275  			},
   276  			expectedStatusCode: http.StatusServiceUnavailable,
   277  		},
   278  		"fail on bad serving cert w/o SAN and increase SAN error counter metrics": {
   279  			user: &user.DefaultInfo{
   280  				Name:   "username",
   281  				Groups: []string{"one", "two"},
   282  			},
   283  			path: "/request/path",
   284  			apiService: &apiregistration.APIService{
   285  				ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   286  				Spec: apiregistration.APIServiceSpec{
   287  					Service:  &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   288  					Group:    "foo",
   289  					Version:  "v1",
   290  					CABundle: testCACrt,
   291  				},
   292  				Status: apiregistration.APIServiceStatus{
   293  					Conditions: []apiregistration.APIServiceCondition{
   294  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   295  					},
   296  				},
   297  			},
   298  			serviceCertOverride:    svcCrtNoSAN,
   299  			increaseSANWarnCounter: true,
   300  			expectedStatusCode:     http.StatusServiceUnavailable,
   301  		},
   302  	}
   303  
   304  	target := &targetHTTPHandler{}
   305  	for name, tc := range tests {
   306  		target.Reset()
   307  		legacyregistry.Reset()
   308  
   309  		func() {
   310  			targetServer := httptest.NewUnstartedServer(target)
   311  			serviceCert := tc.serviceCertOverride
   312  			if serviceCert == nil {
   313  				serviceCert = svcCrt
   314  			}
   315  			if cert, err := tls.X509KeyPair(serviceCert, svcKey); err != nil {
   316  				t.Fatal(err)
   317  			} else {
   318  				targetServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
   319  			}
   320  			targetServer.StartTLS()
   321  			defer targetServer.Close()
   322  
   323  			serviceResolver := tc.serviceResolver
   324  			if serviceResolver == nil {
   325  				serviceResolver = &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}
   326  			}
   327  			handler := &proxyHandler{
   328  				localDelegate:              http.NewServeMux(),
   329  				serviceResolver:            serviceResolver,
   330  				proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
   331  			}
   332  			server := httptest.NewServer(contextHandler(handler, tc.user))
   333  			defer server.Close()
   334  
   335  			if tc.apiService != nil {
   336  				handler.updateAPIService(tc.apiService)
   337  				curr := handler.handlingInfo.Load().(proxyHandlingInfo)
   338  				handler.handlingInfo.Store(curr)
   339  			}
   340  
   341  			resp, err := http.Get(server.URL + tc.path)
   342  			if err != nil {
   343  				t.Errorf("%s: %v", name, err)
   344  				return
   345  			}
   346  			if e, a := tc.expectedStatusCode, resp.StatusCode; e != a {
   347  				body, _ := httputil.DumpResponse(resp, true)
   348  				t.Logf("%s: %v", name, string(body))
   349  				t.Errorf("%s: expected %v, got %v", name, e, a)
   350  				return
   351  			}
   352  			bytes, err := io.ReadAll(resp.Body)
   353  			if err != nil {
   354  				t.Errorf("%s: %v", name, err)
   355  				return
   356  			}
   357  			if !strings.Contains(string(bytes), tc.expectedBody) {
   358  				t.Errorf("%s: expected %q, got %q", name, tc.expectedBody, string(bytes))
   359  				return
   360  			}
   361  
   362  			if e, a := tc.expectedCalled, target.called; e != a {
   363  				t.Errorf("%s: expected %v, got %v", name, e, a)
   364  				return
   365  			}
   366  			// this varies every test
   367  			delete(target.headers, "X-Forwarded-Host")
   368  			if e, a := tc.expectedHeaders, target.headers; !reflect.DeepEqual(e, a) {
   369  				t.Errorf("%s: expected %v, got %v", name, e, a)
   370  				return
   371  			}
   372  			if e, a := targetServer.Listener.Addr().String(), target.host; tc.expectedCalled && !reflect.DeepEqual(e, a) {
   373  				t.Errorf("%s: expected %v, got %v", name, e, a)
   374  				return
   375  			}
   376  
   377  			if tc.increaseSANWarnCounter {
   378  				errorCounter := getSingleCounterValueFromRegistry(t, legacyregistry.DefaultGatherer, "apiserver_kube_aggregator_x509_missing_san_total")
   379  				if errorCounter == -1 {
   380  					t.Errorf("failed to get the x509_missing_san_total metrics: %v", err)
   381  				}
   382  				if int(errorCounter) != 1 {
   383  					t.Errorf("expected the x509_missing_san_total to be 1, but it's %d", errorCounter)
   384  				}
   385  			}
   386  		}()
   387  	}
   388  }
   389  
   390  type mockEgressDialer struct {
   391  	called int
   392  }
   393  
   394  func (m *mockEgressDialer) dial(ctx context.Context, net, addr string) (net.Conn, error) {
   395  	m.called++
   396  	return http.DefaultTransport.(*http.Transport).DialContext(ctx, net, addr)
   397  }
   398  
   399  func (m *mockEgressDialer) dialBroken(ctx context.Context, net, addr string) (net.Conn, error) {
   400  	m.called++
   401  	return nil, fmt.Errorf("Broken dialer")
   402  }
   403  
   404  func newDialerAndSelector() (*mockEgressDialer, *egressselector.EgressSelector) {
   405  	dialer := &mockEgressDialer{}
   406  	m := make(map[egressselector.EgressType]utilnet.DialFunc)
   407  	m[egressselector.Cluster] = dialer.dial
   408  	es := egressselector.NewEgressSelectorWithMap(m)
   409  	return dialer, es
   410  }
   411  
   412  func newBrokenDialerAndSelector() (*mockEgressDialer, *egressselector.EgressSelector) {
   413  	dialer := &mockEgressDialer{}
   414  	m := make(map[egressselector.EgressType]utilnet.DialFunc)
   415  	m[egressselector.Cluster] = dialer.dialBroken
   416  	es := egressselector.NewEgressSelectorWithMap(m)
   417  	return dialer, es
   418  }
   419  
   420  func TestProxyUpgrade(t *testing.T) {
   421  	upgradeUser := "upgradeUser"
   422  	testcases := map[string]struct {
   423  		APIService        *apiregistration.APIService
   424  		NewEgressSelector func() (*mockEgressDialer, *egressselector.EgressSelector)
   425  		ExpectError       bool
   426  		ExpectCalled      bool
   427  	}{
   428  		"valid hostname + CABundle": {
   429  			APIService: &apiregistration.APIService{
   430  				Spec: apiregistration.APIServiceSpec{
   431  					CABundle: testCACrt,
   432  					Group:    "mygroup",
   433  					Version:  "v1",
   434  					Service:  &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   435  				},
   436  				Status: apiregistration.APIServiceStatus{
   437  					Conditions: []apiregistration.APIServiceCondition{
   438  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   439  					},
   440  				},
   441  			},
   442  			ExpectError:  false,
   443  			ExpectCalled: true,
   444  		},
   445  		"invalid hostname + insecure": {
   446  			APIService: &apiregistration.APIService{
   447  				Spec: apiregistration.APIServiceSpec{
   448  					InsecureSkipTLSVerify: true,
   449  					Group:                 "mygroup",
   450  					Version:               "v1",
   451  					Service:               &apiregistration.ServiceReference{Name: "invalid-service", Namespace: "invalid-ns", Port: pointer.Int32Ptr(443)},
   452  				},
   453  				Status: apiregistration.APIServiceStatus{
   454  					Conditions: []apiregistration.APIServiceCondition{
   455  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   456  					},
   457  				},
   458  			},
   459  			ExpectError:  false,
   460  			ExpectCalled: true,
   461  		},
   462  		"invalid hostname + CABundle": {
   463  			APIService: &apiregistration.APIService{
   464  				Spec: apiregistration.APIServiceSpec{
   465  					CABundle: testCACrt,
   466  					Group:    "mygroup",
   467  					Version:  "v1",
   468  					Service:  &apiregistration.ServiceReference{Name: "invalid-service", Namespace: "invalid-ns", Port: pointer.Int32Ptr(443)},
   469  				},
   470  				Status: apiregistration.APIServiceStatus{
   471  					Conditions: []apiregistration.APIServiceCondition{
   472  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   473  					},
   474  				},
   475  			},
   476  			ExpectError:  true,
   477  			ExpectCalled: false,
   478  		},
   479  		"valid hostname + CABundle + egress selector": {
   480  			APIService: &apiregistration.APIService{
   481  				Spec: apiregistration.APIServiceSpec{
   482  					CABundle: testCACrt,
   483  					Group:    "mygroup",
   484  					Version:  "v1",
   485  					Service:  &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   486  				},
   487  				Status: apiregistration.APIServiceStatus{
   488  					Conditions: []apiregistration.APIServiceCondition{
   489  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   490  					},
   491  				},
   492  			},
   493  			NewEgressSelector: newDialerAndSelector,
   494  			ExpectError:       false,
   495  			ExpectCalled:      true,
   496  		},
   497  		"valid hostname + CABundle + egress selector non working": {
   498  			APIService: &apiregistration.APIService{
   499  				Spec: apiregistration.APIServiceSpec{
   500  					CABundle: testCACrt,
   501  					Group:    "mygroup",
   502  					Version:  "v1",
   503  					Service:  &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   504  				},
   505  				Status: apiregistration.APIServiceStatus{
   506  					Conditions: []apiregistration.APIServiceCondition{
   507  						{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   508  					},
   509  				},
   510  			},
   511  			NewEgressSelector: newBrokenDialerAndSelector,
   512  			ExpectError:       true,
   513  			ExpectCalled:      false,
   514  		},
   515  	}
   516  
   517  	for k, tc := range testcases {
   518  		tcName := k
   519  		t.Run(tcName, func(t *testing.T) {
   520  			path := "/apis/" + tc.APIService.Spec.Group + "/" + tc.APIService.Spec.Version + "/foo"
   521  			timesCalled := int32(0)
   522  			backendHandler := http.NewServeMux()
   523  			backendHandler.Handle(path, websocket.Handler(func(ws *websocket.Conn) {
   524  				atomic.AddInt32(&timesCalled, 1)
   525  				defer ws.Close()
   526  				req := ws.Request()
   527  				user := req.Header.Get("X-Remote-User")
   528  				if user != upgradeUser {
   529  					t.Errorf("expected user %q, got %q", upgradeUser, user)
   530  				}
   531  				body := make([]byte, 5)
   532  				ws.Read(body)
   533  				ws.Write([]byte("hello " + string(body)))
   534  			}))
   535  
   536  			backendServer := httptest.NewUnstartedServer(backendHandler)
   537  			cert, err := tls.X509KeyPair(svcCrt, svcKey)
   538  			if err != nil {
   539  				t.Errorf("https (valid hostname): %v", err)
   540  				return
   541  			}
   542  			backendServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
   543  			backendServer.StartTLS()
   544  			defer backendServer.Close()
   545  
   546  			defer func() {
   547  				if called := atomic.LoadInt32(&timesCalled) > 0; called != tc.ExpectCalled {
   548  					t.Errorf("%s: expected called=%v, got %v", tcName, tc.ExpectCalled, called)
   549  				}
   550  			}()
   551  
   552  			serverURL, _ := url.Parse(backendServer.URL)
   553  			proxyHandler := &proxyHandler{
   554  				serviceResolver:            &mockedRouter{destinationHost: serverURL.Host},
   555  				proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
   556  			}
   557  
   558  			var dialer *mockEgressDialer
   559  			var selector *egressselector.EgressSelector
   560  			if tc.NewEgressSelector != nil {
   561  				dialer, selector = tc.NewEgressSelector()
   562  
   563  				egressDialer, err := selector.Lookup(egressselector.Cluster.AsNetworkContext())
   564  				if err != nil {
   565  					t.Fatal(err)
   566  				}
   567  				if egressDialer != nil {
   568  					proxyHandler.proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
   569  				}
   570  			}
   571  
   572  			proxyHandler.updateAPIService(tc.APIService)
   573  			aggregator := httptest.NewServer(contextHandler(proxyHandler, &user.DefaultInfo{Name: upgradeUser}))
   574  			defer aggregator.Close()
   575  
   576  			ws, err := websocket.Dial("ws://"+aggregator.Listener.Addr().String()+path, "", "http://127.0.0.1/")
   577  			if err != nil {
   578  				if !tc.ExpectError {
   579  					t.Errorf("%s: websocket dial err: %s", tcName, err)
   580  				}
   581  				return
   582  			}
   583  			defer ws.Close()
   584  
   585  			// if the egressselector is configured assume it has to be called
   586  			if dialer != nil && dialer.called != 1 {
   587  				t.Errorf("expect egress dialer gets called %d times, got %d", 1, dialer.called)
   588  			}
   589  
   590  			if tc.ExpectError {
   591  				t.Errorf("%s: expected websocket error, got none", tcName)
   592  				return
   593  			}
   594  
   595  			if _, err := ws.Write([]byte("world")); err != nil {
   596  				t.Errorf("%s: write err: %s", tcName, err)
   597  				return
   598  			}
   599  
   600  			response := make([]byte, 20)
   601  			n, err := ws.Read(response)
   602  			if err != nil {
   603  				t.Errorf("%s: read err: %s", tcName, err)
   604  				return
   605  			}
   606  			if e, a := "hello world", string(response[0:n]); e != a {
   607  				t.Errorf("%s: expected '%#v', got '%#v'", tcName, e, a)
   608  				return
   609  			}
   610  		})
   611  	}
   612  }
   613  
   614  var testCACrt = []byte(`-----BEGIN CERTIFICATE-----
   615  MIIDGTCCAgGgAwIBAgIUAlOGbZ9MSBRFDMq483nGW7h4YNIwDQYJKoZIhvcNAQEL
   616  BQAwGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTAgFw0yMDEwMDcxNDI4MDVa
   617  GA8yMjk0MDcyMzE0MjgwNVowGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTCC
   618  ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANQF9aox1wJlB7wrFeEDYlRk
   619  2AIfC28PZYjW3LsW7/gas2ImmRpzdZYq3nNFQwF67sUudeuuNNAvEngb8Q1wojG7
   620  Uftt52c9e0Hi5LDxElWV3Tw1XyZFJsk5uwVNb377r7CDfTX3WUsX1WlUeUF6xmwE
   621  M4jYQJ9pMPNUOEWpe7G8daTYineTVvrHvGpxVMMSpOWTWy4+oqWaz5tfFSbyvNZT
   622  +eOLNkDo441KfXvb66zWV4AEfB2QDyGGMuPUT/FgsZHNuj/WNjt3bWvyey9ZGlDm
   623  LPnJgbzEP1FnfIdtuSpHhbWox2Jnuht4hCwhTW1lcAi68MSQEs8KqptEhIJoIxkC
   624  AwEAAaNTMFEwHQYDVR0OBBYEFJnGJQd3VkQP5cZLB1n9/FRKyBLPMB8GA1UdIwQY
   625  MBaAFJnGJQd3VkQP5cZLB1n9/FRKyBLPMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI
   626  hvcNAQELBQADggEBALwqR2oo3v5Ghs9hS1YQIqegQ/IGZqQwiRz2HFTUGzf5+nUY
   627  BpZHQPapLJ6Ki687rY4nkdEAMCeZjefBFc6uawo6rY4O8IiJAQbDprNNK8oerwiM
   628  BWSDDDjoNxMZMCegSAv39YSonecKZsg7+l1K/nmuQNehgHNem71ZroaRCFvJJ59E
   629  WSd3QP+Gh9iKabsDnkBrTk5KFa7X24c43DJ23kPE49NOwBhiM6Fs8q+tdzWzaVSb
   630  56uXONZxYmFH5yDFvnBIqk2Fys5Klsn6IsM1BCgH2snbA6kwh9Kph4pLdAVGyR9i
   631  MxfBxx4eUypOzIBGqa3OmvMcuNElBe8fcUtpqO0=
   632  -----END CERTIFICATE-----`)
   633  
   634  /* testCAKey
   635  -----BEGIN RSA PRIVATE KEY-----
   636  MIIEpAIBAAKCAQEA1AX1qjHXAmUHvCsV4QNiVGTYAh8Lbw9liNbcuxbv+BqzYiaZ
   637  GnN1lirec0VDAXruxS5166400C8SeBvxDXCiMbtR+23nZz17QeLksPESVZXdPDVf
   638  JkUmyTm7BU1vfvuvsIN9NfdZSxfVaVR5QXrGbAQziNhAn2kw81Q4Ral7sbx1pNiK
   639  d5NW+se8anFUwxKk5ZNbLj6ipZrPm18VJvK81lP544s2QOjjjUp9e9vrrNZXgAR8
   640  HZAPIYYy49RP8WCxkc26P9Y2O3dta/J7L1kaUOYs+cmBvMQ/UWd8h225KkeFtajH
   641  Yme6G3iELCFNbWVwCLrwxJASzwqqm0SEgmgjGQIDAQABAoIBAGRWua8kzRMWCvYT
   642  EdSeDF/SJaPDW17g03VR8b4cmc45nKEbkSNCduhtOz8kDRTbP7pTRX0WwWmwjTYI
   643  SyjIIAoXEzJBDdz+7KD+pqnSPJICTWPcAj6TRUq/pnFY9yYKKFgJsizi9QAjtFyX
   644  nJbPaq3dwyHE7bhDSOYu+j6FecNfhqrvj1JbRvIhllKaZJC6He3mNCkHeHtW6ZFk
   645  qJJzWQtPFwqT7tsYCJikwUcQs6QqhD+pPnTYlAkBf24z8ByR8ET7vvOBpH53ufG/
   646  +gv1K1H+JXQhyO8p4ga4/DdWl+qZoQyTDzm0wy/q+lo/w/pzdIVtgUmguVk/qXad
   647  Bgb6ie0CgYEA/StHVneYIIBmHNdz2fPMMbMzv99/ngDN+ed6Swj+aZKeRgf+QljU
   648  QScwRHUlGKvrFE7Dq/TXVEYO9ksC7tEpIQaSKUlHLBJhhCo8YvfLvH6zoF3F1W6d
   649  7a0ZyXhCWEp5NaNhKdRUMNVXt5H5jf5IGcGXgsErDAStJhiBW/+D0mcCgYEA1mTl
   650  qjclhUr1Ef+wu2N2kNhi8NnScapC1fKjqCzlGcT74lB0BZpizY/hsxSlrtXZE0jI
   651  DhrpiYaxKx4G/Ktr6cu4u0V6sYLH36+wbxmSV2XokHhUfPXZYZNO6+s7mHa8P10N
   652  byTvocQzDhRwN3aD0d32/f8FFvPCZrg2MKAB7n8CgYEArLvZqZJhtlNE2IrcHaos
   653  +QAG3/QzE3ADGW4pT4bsZsXFvYx4m3YWI/oEAcFXtTSfaTSwZuPgAzzlun/FmYIW
   654  KNVd5lN7/wLvjAhxOSlO1eYw0ssITy5xDJhdjsvBoJH3j3RQuASKCOOXPMWZWptT
   655  QFeI84quvz11kheIM2fr3iMCgYApKPnGsgusCXX/XJ1rfG744/Iq10bFt7BZLtoo
   656  oWXiiqTpEBUWNkudt2/XV7FvXXLtdt2hh50qYAeHhZ5FyAtRuWDf4zjo93i0AyDW
   657  U4x65v+9LLzbuL9hMkzGkkTAwprld1Hq8qZm4ioDG/1nSIOKORkALoOlomrCGb+d
   658  mjqEtQKBgQCwtZ7yWxDn/dHeO32VBZOR2YZutOc61BtQHdMqoYsk7qR2ixxVG2bb
   659  1jTedAqac+x0HnJ6au5jdbv0Z95cyyX22MMWaW/H/LNMLxL85OaZfiqjVnntzHcK
   660  jHXdYlJHC8Eslr3iUvRUodgRwOB8c4wWF7s5b6mxGqoXgsNsLrOUPw==
   661  -----END RSA PRIVATE KEY-----
   662  */
   663  
   664  // valid for hostname test-service.test-ns.svc
   665  // signed by testCACrt
   666  var svcCrt = []byte(`-----BEGIN CERTIFICATE-----
   667  MIIDMjCCAhqgAwIBAgIUEBND1EVKxjU7UaJ1ZBw1glkXuaowDQYJKoZIhvcNAQEL
   668  BQAwGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTAgFw0yMDEwMDcxNDI4MDVa
   669  GA8yMjk0MDcyMzE0MjgwNVowIzEhMB8GA1UEAwwYdGVzdC1zZXJ2aWNlLnRlc3Qt
   670  bnMuc3ZjMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvDXYxvaUdbX/
   671  MA3+3SdYY4o8Jl2s1PW9MX4Mr/nCNltyOKDgfSABCN4XVsrd+/A+/zQt+EyJEJxM
   672  rd1syhzd/TJAnGzexmZg/dIi0jC3oBe/qyERWimZhqbu0O+0EpFx5qLzQ5eLabLU
   673  9CtBwRSyYQjqsDmPoqplsKxaFF9NIFQrh1zmxBay9vTY7P7sLkfZ8LifP6jgQ5NH
   674  QkjaY9XCMzYbcrzbc2r9vxTm//IR1cWxaifTNE9qo2NL1iiPGTpot65z83BWeu/q
   675  WOU+aGUhY/xcZH0w/rUJ7ffviyd94EY4IN7FUJv53EJgmEp4UOaY1fAFtAFQQbVz
   676  tGjYGpZ22wIDAQABo2QwYjAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUE
   677  FjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwKQYDVR0RBCIwIIcEfwAAAYIYdGVzdC1z
   678  ZXJ2aWNlLnRlc3QtbnMuc3ZjMA0GCSqGSIb3DQEBCwUAA4IBAQCw/EoFXFahLC4g
   679  4iq9VWhnCmAqUv6IuJqOMC+qEH7fSB3UDAjL4A2iuNJaBAxhI2bccoP2wtqZCkHH
   680  0YLyoKOPjgl6VZtByco8Su7T9yOaef6aX1OP4Snm/aeYdVbjSBKVwMywmmb34XFa
   681  azChi6sq4TFPNesUUoEGkKErU+XG/ecp9Obc0DK/3AAVx/Fk8W5104m1i9PWlUZ2
   682  KlyxQ5F2alBRv9csIpl2syWQ90DMSQ1Y/R8b+kfsBG7RwDbmwGpZLQTwhE8Uga9T
   683  ZDnmwjUmWn7SD3ouyBSnbWkLE1KcbB32mz5jrwfKCPIa5ka+GIFrme1HxRoQziGo
   684  w+KU2RWu
   685  -----END CERTIFICATE-----`)
   686  
   687  var svcCrtNoSAN = []byte(`-----BEGIN CERTIFICATE-----
   688  MIIDBzCCAe+gAwIBAgIUEBND1EVKxjU7UaJ1ZBw1glkXuaswDQYJKoZIhvcNAQEL
   689  BQAwGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTAgFw0yMDEwMDcxNDI4MDVa
   690  GA8yMjk0MDcyMzE0MjgwNVowIzEhMB8GA1UEAwwYdGVzdC1zZXJ2aWNlLnRlc3Qt
   691  bnMuc3ZjMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvDXYxvaUdbX/
   692  MA3+3SdYY4o8Jl2s1PW9MX4Mr/nCNltyOKDgfSABCN4XVsrd+/A+/zQt+EyJEJxM
   693  rd1syhzd/TJAnGzexmZg/dIi0jC3oBe/qyERWimZhqbu0O+0EpFx5qLzQ5eLabLU
   694  9CtBwRSyYQjqsDmPoqplsKxaFF9NIFQrh1zmxBay9vTY7P7sLkfZ8LifP6jgQ5NH
   695  QkjaY9XCMzYbcrzbc2r9vxTm//IR1cWxaifTNE9qo2NL1iiPGTpot65z83BWeu/q
   696  WOU+aGUhY/xcZH0w/rUJ7ffviyd94EY4IN7FUJv53EJgmEp4UOaY1fAFtAFQQbVz
   697  tGjYGpZ22wIDAQABozkwNzAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUE
   698  FjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwDQYJKoZIhvcNAQELBQADggEBAMPhbecq
   699  wJtlKnSe27xQIM1bNkI/+r1aVmuJqYYbtzCaVZFnFRD6ZbCLfEo7QT17gs7ulryI
   700  yfeITEMAWG6Bq8cOhNQfXRIf2YMFHbDsFbfAEREy/jfYGw8G4b6RBVQzcuglCCB/
   701  Y0++skz8kYIR1KuZnCtC6A0kaM2XrTWCXAc5KB0Q/WO0wqqWbH/xmEYQVZmDqWOH
   702  k+qVFD+I1oT5NOzFpzaUe4T7grzoLs24IE0c+0clcc9pxTDXTfPyoLG9n3zxG0Ma
   703  hPtkUeeEK8p73Zf/F4JHQ4tJv5XY1ytWkTROE79P6qT0BY/XZSpsGmB7TIS7wFCW
   704  RfKAqN95Uso3IBI=
   705  -----END CERTIFICATE-----`)
   706  
   707  var svcKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
   708  MIIEpAIBAAKCAQEAvDXYxvaUdbX/MA3+3SdYY4o8Jl2s1PW9MX4Mr/nCNltyOKDg
   709  fSABCN4XVsrd+/A+/zQt+EyJEJxMrd1syhzd/TJAnGzexmZg/dIi0jC3oBe/qyER
   710  WimZhqbu0O+0EpFx5qLzQ5eLabLU9CtBwRSyYQjqsDmPoqplsKxaFF9NIFQrh1zm
   711  xBay9vTY7P7sLkfZ8LifP6jgQ5NHQkjaY9XCMzYbcrzbc2r9vxTm//IR1cWxaifT
   712  NE9qo2NL1iiPGTpot65z83BWeu/qWOU+aGUhY/xcZH0w/rUJ7ffviyd94EY4IN7F
   713  UJv53EJgmEp4UOaY1fAFtAFQQbVztGjYGpZ22wIDAQABAoIBAD7Wl5buUujuJ9Jq
   714  idJaxZcOW0DP+9lqZo10sVW7xM0TQRKJHAqKue21AQPYXb81GkNor4R8QTMLjEps
   715  aFsewjs8IPhZHRQOsIluNHQLEfPgmfzP4JRC2WBsscWOkoe0idvgQeoqWcCjlZgk
   716  LSMC/v+I05qczUkZLTSMhtLQcta80OxU99kNU8Kfi6NFiAioqVQl4KlczjLJiUbK
   717  3RGOqThtjS0IzXXFr+T+bgxQkmkyAPGmx06OqqM8hdA+6WsRb8LS1XfK7qGWbU0T
   718  7mIehkcMFDRgxlDh4JfCQzWuLTax3Ds8BApJwZCBEQz8T+FbVWJpBwezyhaKBOis
   719  nQmtw8ECgYEA3E+mANY6YNVfFztMpjfh57dY2DLZY9h1yHRK13FM7EK0Z8GgMji6
   720  kDIubUBta19g3+YI4qIJgvS527ipVEHW0lYUIQ3q+JnafTC7mMxT+2J/j+lrZhrw
   721  aIPxZML29iEm64Wr3mCmUU98iy5z7EUqqKTNwr03f2eSBeO/xn6VtrsCgYEA2rL4
   722  tOJMoMDfQzAe7KIqEUn2Ob0nYP/MJZ1I8wrrdGMDhp4xofr+m99++uFPqm5u5uI5
   723  cJ6+xZQ1A6CJSKWtzOALsKN1xx+JJh9Wo2vUliDomKtarFiQO+ONLpnjuSraDMWY
   724  cKx6eXqqgit5hlQeCva2cbUP1De++3RhEpC6DmECgYA8kCiyUjH6LK3XVRXdG7+e
   725  U2i5BkF8kSTP1ig80Yiz6iJt42yGYdHnkePxZKSvv6iB5FrM8n5q4Zu2Ky1hXDgR
   726  2lfuPkU50hGeGKd5ebIciRdIGILNrton4R2a9X2ua66nUDfPCgKul4tFN5/mc50m
   727  fyeRQTLgczhRJiqyBlphwQKBgQDTnjBIH12Ug2zF/688vGHGXvIRxrVvB7XLg9lN
   728  y/gvo4uK3FIccdmijG27Zv+GY9uOL8Ly9biVSKbPvqx4jlCRmQ3WuyTBLAOyzsov
   729  0axgJLHM4KoZcI0IVlSLjj8rMorRpvWtuUe9enO5B0ZNM+HqK/Y4KsKJT/POLzur
   730  Ej3moQKBgQC+RWcly9opx0We4LG0lcdG3V0cawDRP2MmLbxHA/kSuGf5aBMJoCdf
   731  f0vRPPCK7dpPGOX9x8Oz7K7QiOEvFL3Mv1sWBEnl5lSkK8gdBhi6St9RRBGimt2H
   732  S+8g5OWupiWGF6qN+XX5WgYyuipW8mVRaROj8Vyl7JSiwu6KHfZ8RQ==
   733  -----END RSA PRIVATE KEY-----`)
   734  
   735  func TestGetContextForNewRequest(t *testing.T) {
   736  	done := make(chan struct{})
   737  	server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
   738  		<-done // never return so that we're certain to return base on timeout
   739  	}))
   740  	defer server.Close()
   741  	defer close(done)
   742  
   743  	proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
   744  		location, err := url.Parse(server.URL)
   745  		if err != nil {
   746  			t.Fatal(err)
   747  		}
   748  		location.Path = req.URL.Path
   749  
   750  		nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path}))
   751  		newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, nestedReq)
   752  		defer cancelFn()
   753  
   754  		theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w})
   755  		theproxy.ServeHTTP(w, newReq)
   756  	}))
   757  	defer proxyServer.Close()
   758  
   759  	// normal clients will not be setting a timeout, don't set one here.  Our proxy logic should construct this for us
   760  	resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version")
   761  	if err != nil {
   762  		t.Fatal(err)
   763  	}
   764  	if resp.StatusCode != http.StatusServiceUnavailable {
   765  		t.Error(err)
   766  	}
   767  	body, err := io.ReadAll(resp.Body)
   768  	if err != nil {
   769  		t.Fatal(err)
   770  	}
   771  	if !strings.Contains(string(body), "context deadline exceeded") {
   772  		t.Error(string(body))
   773  	}
   774  
   775  }
   776  
   777  func TestNewRequestForProxyWithAuditID(t *testing.T) {
   778  	tests := []struct {
   779  		name    string
   780  		auditID string
   781  	}{
   782  		{
   783  			name:    "original request has Audit-ID",
   784  			auditID: "foo-bar",
   785  		},
   786  		{
   787  			name:    "original request does not have Audit-ID",
   788  			auditID: "",
   789  		},
   790  	}
   791  
   792  	for _, test := range tests {
   793  		t.Run(test.name, func(t *testing.T) {
   794  			req, err := http.NewRequest(http.MethodGet, "/api/group/version/foos/namespace/foo", nil)
   795  			if err != nil {
   796  				t.Fatalf("failed to create new http request - %v", err)
   797  			}
   798  
   799  			req = req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path}))
   800  			if len(test.auditID) > 0 {
   801  				ctx := audit.WithAuditContext(req.Context())
   802  				audit.WithAuditID(ctx, types.UID(test.auditID))
   803  				req = req.WithContext(ctx)
   804  			}
   805  
   806  			newReq, _ := apiserverproxyutil.NewRequestForProxy(req.URL, req)
   807  			if newReq == nil {
   808  				t.Fatal("expected a non nil Request object")
   809  			}
   810  
   811  			auditIDGot := newReq.Header.Get("Audit-ID")
   812  			if test.auditID != auditIDGot {
   813  				t.Errorf("expected an Audit-ID value: %q, but got: %q", test.auditID, auditIDGot)
   814  			}
   815  		})
   816  	}
   817  }
   818  
   819  // TestProxyCertReload verifies that the proxy reloading of certificates work
   820  // to be able to test the reloading it starts a server with client auth enabled
   821  // it first uses certs that does not match the client CA so the verification fails - expecting HTTP 503
   822  // then we write correct client certs to the disk, expecting the proxy to reload the cert and use it for the next request
   823  //
   824  // Note: this test doesn't use apiserviceRegistrationController nor it doesn't start DynamicServingContentFromFiles controller
   825  // instead it manually calls to updateAPIService and RunOnce to reload the certificate
   826  func TestProxyCertReload(t *testing.T) {
   827  	// STEP 1: set up a backend server that will require the client certificate
   828  	//         this server uses clientCaCrt() to validate the client certificate
   829  	backendHandler := &targetHTTPHandler{}
   830  	backendServer := httptest.NewUnstartedServer(backendHandler)
   831  	if cert, err := tls.X509KeyPair(backendCertificate(), backendKey()); err != nil {
   832  		t.Fatal(err)
   833  	} else {
   834  		caCertPool := x509.NewCertPool()
   835  		// we're testing this while enabling MTLS
   836  		caCertPool.AppendCertsFromPEM(clientCaCrt())
   837  		backendServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}, ClientAuth: tls.RequireAndVerifyClientCert, ClientCAs: caCertPool}
   838  	}
   839  	backendServer.StartTLS()
   840  	defer backendServer.Close()
   841  
   842  	// STEP 2: set up the aggregator that will use an invalid certificate (it won't be validated by the clientCA) to auth against the backend server
   843  	aggregatorHandler := &proxyHandler{
   844  		localDelegate:   http.NewServeMux(),
   845  		serviceResolver: &mockedRouter{destinationHost: backendServer.Listener.Addr().String()},
   846  	}
   847  	certFile, keyFile, dir := getCertAndKeyPaths(t)
   848  	writeCerts(certFile, keyFile, backendCertificate(), backendKey(), t)
   849  
   850  	defer func() {
   851  		if err := os.RemoveAll(dir); err != nil {
   852  			t.Errorf("Unable to clean up test directory %q: %v", dir, err)
   853  		}
   854  	}()
   855  
   856  	certProvider, err := dynamiccertificates.NewDynamicServingContentFromFiles("test", certFile, keyFile)
   857  	if err != nil {
   858  		t.Fatalf("Unable to create dynamic certificates: %v", err)
   859  	}
   860  	ctx := context.TODO()
   861  	err = certProvider.RunOnce(ctx)
   862  	if err != nil {
   863  		t.Fatalf("Unable to load dynamic certificates: %v", err)
   864  	}
   865  	aggregatorHandler.proxyCurrentCertKeyContent = certProvider.CurrentCertKeyContent
   866  
   867  	apiService := &apiregistration.APIService{
   868  		ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
   869  		Spec: apiregistration.APIServiceSpec{
   870  			Service:  &apiregistration.ServiceReference{Name: "test-service2", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
   871  			Group:    "foo",
   872  			Version:  "v1",
   873  			CABundle: backendCaCertificate(), // used to validate backendCertificate()
   874  		},
   875  		Status: apiregistration.APIServiceStatus{
   876  			Conditions: []apiregistration.APIServiceCondition{
   877  				{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
   878  			},
   879  		},
   880  	}
   881  	aggregatorHandler.updateAPIService(apiService)
   882  
   883  	server := httptest.NewServer(contextHandler(aggregatorHandler, &user.DefaultInfo{
   884  		Name:   "username",
   885  		Groups: []string{"one", "two"},
   886  	}))
   887  	defer server.Close()
   888  
   889  	resp, err := http.Get(server.URL + "/request/path")
   890  	if err != nil {
   891  		t.Fatalf("got unexpected error: %v", err)
   892  	}
   893  	if resp.StatusCode != http.StatusServiceUnavailable {
   894  		t.Fatalf("Expected status code 503 but got %d", resp.StatusCode)
   895  	}
   896  
   897  	// STEP 3: swap the certificate used by the aggregator to auth against the backend server and verify the request passes
   898  	//         note that this step uses the certificate that can be validated by the backend server with clientCaCrt()
   899  	writeCerts(certFile, keyFile, clientCert(), clientKey(), t)
   900  	err = certProvider.RunOnce(ctx)
   901  	if err != nil {
   902  		t.Fatalf("Expected no error when refreshing dynamic certs, got %v", err)
   903  	}
   904  	aggregatorHandler.updateAPIService(apiService)
   905  
   906  	resp, err = http.Get(server.URL + "/request/path")
   907  	if err != nil {
   908  		t.Errorf("%v", err)
   909  	}
   910  	if resp.StatusCode != http.StatusOK {
   911  		t.Fatalf("Expected status code 200 but got %d", resp.StatusCode)
   912  	}
   913  }
   914  
   915  type fcInitSignal struct {
   916  	nSignals int32
   917  }
   918  
   919  func (s *fcInitSignal) SignalCount() int {
   920  	return int(atomic.SwapInt32(&s.nSignals, 0))
   921  }
   922  
   923  func (s *fcInitSignal) Signal() {
   924  	atomic.AddInt32(&s.nSignals, 1)
   925  }
   926  
   927  func (s *fcInitSignal) Wait() {
   928  }
   929  
   930  type hookedListener struct {
   931  	l        net.Listener
   932  	onAccept func()
   933  }
   934  
   935  func (wl *hookedListener) Accept() (net.Conn, error) {
   936  	conn, err := wl.l.Accept()
   937  	if err == nil {
   938  		wl.onAccept()
   939  	}
   940  	return conn, err
   941  }
   942  
   943  func (wl *hookedListener) Close() error {
   944  	return wl.l.Close()
   945  }
   946  
   947  func (wl *hookedListener) Addr() net.Addr {
   948  	return wl.l.Addr()
   949  }
   950  
   951  func TestFlowControlSignal(t *testing.T) {
   952  	for _, tc := range []struct {
   953  		Name           string
   954  		Local          bool
   955  		Available      bool
   956  		Request        http.Request
   957  		SignalExpected bool
   958  	}{
   959  		{
   960  			Name:           "local",
   961  			Local:          true,
   962  			SignalExpected: false,
   963  		},
   964  		{
   965  			Name:           "unavailable",
   966  			Local:          false,
   967  			Available:      false,
   968  			SignalExpected: false,
   969  		},
   970  		{
   971  			Name:           "request performed",
   972  			Local:          false,
   973  			Available:      true,
   974  			SignalExpected: true,
   975  		},
   976  		{
   977  			Name:      "upgrade request performed",
   978  			Local:     false,
   979  			Available: true,
   980  			Request: http.Request{
   981  				Header: http.Header{"Connection": []string{"Upgrade"}},
   982  			},
   983  			SignalExpected: true,
   984  		},
   985  	} {
   986  		t.Run(tc.Name, func(t *testing.T) {
   987  			okh := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   988  				w.WriteHeader(http.StatusOK)
   989  			})
   990  
   991  			var sig fcInitSignal
   992  
   993  			var signalCountOnAccept int32
   994  			backend := httptest.NewUnstartedServer(okh)
   995  			backend.Listener = &hookedListener{
   996  				l: backend.Listener,
   997  				onAccept: func() {
   998  					atomic.StoreInt32(&signalCountOnAccept, int32(sig.SignalCount()))
   999  				},
  1000  			}
  1001  			backend.Start()
  1002  			defer backend.Close()
  1003  
  1004  			p := proxyHandler{
  1005  				localDelegate:   okh,
  1006  				serviceResolver: &mockedRouter{destinationHost: backend.Listener.Addr().String()},
  1007  			}
  1008  
  1009  			server := httptest.NewServer(contextHandler(
  1010  				http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  1011  					p.ServeHTTP(w, r.WithContext(utilflowcontrol.WithInitializationSignal(r.Context(), &sig)))
  1012  				}),
  1013  				&user.DefaultInfo{
  1014  					Name:   "username",
  1015  					Groups: []string{"one", "two"},
  1016  				},
  1017  			))
  1018  			defer server.Close()
  1019  
  1020  			p.handlingInfo.Store(proxyHandlingInfo{
  1021  				local:             tc.Local,
  1022  				serviceAvailable:  tc.Available,
  1023  				proxyRoundTripper: backend.Client().Transport,
  1024  			})
  1025  
  1026  			surl, err := url.Parse(server.URL)
  1027  			if err != nil {
  1028  				t.Fatalf("unexpected error: %v", err)
  1029  			}
  1030  
  1031  			req := tc.Request
  1032  			req.URL = surl
  1033  			res, err := server.Client().Do(&req)
  1034  			if err != nil {
  1035  				t.Fatalf("unexpected error: %v", err)
  1036  			}
  1037  			if err := res.Body.Close(); err != nil {
  1038  				t.Fatalf("unexpected error: %v", err)
  1039  			}
  1040  
  1041  			if fired := (atomic.LoadInt32(&signalCountOnAccept) > 0); tc.SignalExpected && !fired {
  1042  				t.Errorf("flow control signal expected but not fired")
  1043  			} else if fired && !tc.SignalExpected {
  1044  				t.Errorf("flow control signal fired but not expected")
  1045  			}
  1046  		})
  1047  	}
  1048  }
  1049  
  1050  func getCertAndKeyPaths(t *testing.T) (string, string, string) {
  1051  	dir, err := os.MkdirTemp(os.TempDir(), "k8s-test-handler-proxy-cert")
  1052  	if err != nil {
  1053  		t.Fatalf("Unable to create the test directory %q: %v", dir, err)
  1054  	}
  1055  	certFile := filepath.Join(dir, "certfile.pem")
  1056  	keyFile := filepath.Join(dir, "keytfile.pem")
  1057  	return certFile, keyFile, dir
  1058  }
  1059  
  1060  func writeCerts(certFile, keyFile string, certContent, keyContent []byte, t *testing.T) {
  1061  	if err := os.WriteFile(certFile, certContent, 0600); err != nil {
  1062  		t.Fatalf("Unable to create the file %q: %v", certFile, err)
  1063  	}
  1064  	if err := os.WriteFile(keyFile, keyContent, 0600); err != nil {
  1065  		t.Fatalf("Unable to create the file %q: %v", keyFile, err)
  1066  	}
  1067  }
  1068  
  1069  func getSingleCounterValueFromRegistry(t *testing.T, r metrics.Gatherer, name string) int {
  1070  	mfs, err := r.Gather()
  1071  	if err != nil {
  1072  		t.Logf("failed to gather local registry metrics: %v", err)
  1073  		return -1
  1074  	}
  1075  
  1076  	for _, mf := range mfs {
  1077  		if mf.Name != nil && *mf.Name == name {
  1078  			mfMetric := mf.GetMetric()
  1079  			for _, m := range mfMetric {
  1080  				if m.GetCounter() != nil {
  1081  					return int(m.GetCounter().GetValue())
  1082  				}
  1083  			}
  1084  		}
  1085  	}
  1086  
  1087  	return -1
  1088  }
  1089  
  1090  func readTestFile(filename string) []byte {
  1091  	data, err := os.ReadFile("testdata/" + filename)
  1092  	if err != nil {
  1093  		panic(err)
  1094  	}
  1095  	return data
  1096  }
  1097  
  1098  // cert and ca for client auth
  1099  func clientCert() []byte { return readTestFile("client.pem") }
  1100  
  1101  func clientKey() []byte { return readTestFile("client-key.pem") }
  1102  
  1103  func backendCertificate() []byte { return readTestFile("server.pem") }
  1104  
  1105  func backendKey() []byte { return readTestFile("server-key.pem") }
  1106  
  1107  func backendCaCertificate() []byte { return readTestFile("server-ca.pem") }
  1108  
  1109  func clientCaCrt() []byte { return readTestFile("client-ca.pem") }
  1110  

View as plain text