1
16
17 package apiserver
18
19 import (
20 "context"
21 "crypto/tls"
22 "crypto/x509"
23 "fmt"
24 "io"
25 "net"
26 "net/http"
27 "net/http/httptest"
28 "net/http/httputil"
29 "net/url"
30 "os"
31 "path/filepath"
32 "reflect"
33 "strings"
34 "sync/atomic"
35 "testing"
36
37 "k8s.io/apiserver/pkg/audit"
38 "k8s.io/apiserver/pkg/server/dynamiccertificates"
39 "k8s.io/client-go/transport"
40
41 "golang.org/x/net/websocket"
42
43 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44 "k8s.io/apimachinery/pkg/types"
45 utilnet "k8s.io/apimachinery/pkg/util/net"
46 "k8s.io/apimachinery/pkg/util/proxy"
47 "k8s.io/apimachinery/pkg/util/sets"
48 "k8s.io/apiserver/pkg/authentication/user"
49 genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
50 "k8s.io/apiserver/pkg/server/egressselector"
51 utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
52 apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
53 "k8s.io/component-base/metrics"
54 "k8s.io/component-base/metrics/legacyregistry"
55 apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
56 "k8s.io/utils/pointer"
57 )
58
59 type targetHTTPHandler struct {
60 called bool
61 headers map[string][]string
62 path string
63 host string
64 }
65
66 func (d *targetHTTPHandler) Reset() {
67 d.path = ""
68 d.called = false
69 d.headers = nil
70 d.host = ""
71 }
72
73 func (d *targetHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
74 d.path = r.URL.Path
75 d.called = true
76 d.headers = r.Header
77 d.host = r.Host
78 w.WriteHeader(http.StatusOK)
79 }
80
81 func contextHandler(handler http.Handler, user user.Info) http.Handler {
82 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
83 ctx := req.Context()
84 if user != nil {
85 ctx = genericapirequest.WithUser(ctx, user)
86 }
87 resolver := &genericapirequest.RequestInfoFactory{
88 APIPrefixes: sets.NewString("api", "apis"),
89 GrouplessAPIPrefixes: sets.NewString("api"),
90 }
91 info, err := resolver.NewRequestInfo(req)
92 if err == nil {
93 ctx = genericapirequest.WithRequestInfo(ctx, info)
94 }
95 req = req.WithContext(ctx)
96 handler.ServeHTTP(w, req)
97 })
98 }
99
100 type mockedRouter struct {
101 destinationHost string
102 err error
103 }
104
105 func (r *mockedRouter) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
106 return &url.URL{Scheme: "https", Host: r.destinationHost}, r.err
107 }
108
109 func emptyCert() []byte {
110 return []byte{}
111 }
112
113 func TestProxyHandler(t *testing.T) {
114 tests := map[string]struct {
115 user user.Info
116 path string
117 apiService *apiregistration.APIService
118
119 serviceResolver ServiceResolver
120 serviceCertOverride []byte
121 increaseSANWarnCounter bool
122
123 expectedStatusCode int
124 expectedBody string
125 expectedCalled bool
126 expectedHeaders map[string][]string
127 }{
128 "no target": {
129 expectedStatusCode: http.StatusNotFound,
130 },
131 "no user": {
132 apiService: &apiregistration.APIService{
133 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
134 Spec: apiregistration.APIServiceSpec{
135 Service: &apiregistration.ServiceReference{Port: pointer.Int32Ptr(443)},
136 Group: "foo",
137 Version: "v1",
138 },
139 Status: apiregistration.APIServiceStatus{
140 Conditions: []apiregistration.APIServiceCondition{
141 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
142 },
143 },
144 },
145 expectedStatusCode: http.StatusInternalServerError,
146 expectedBody: "missing user",
147 },
148 "proxy with user, insecure": {
149 user: &user.DefaultInfo{
150 Name: "username",
151 Groups: []string{"one", "two"},
152 },
153 path: "/request/path",
154 apiService: &apiregistration.APIService{
155 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
156 Spec: apiregistration.APIServiceSpec{
157 Service: &apiregistration.ServiceReference{Port: pointer.Int32Ptr(443)},
158 Group: "foo",
159 Version: "v1",
160 InsecureSkipTLSVerify: true,
161 },
162 Status: apiregistration.APIServiceStatus{
163 Conditions: []apiregistration.APIServiceCondition{
164 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
165 },
166 },
167 },
168 expectedStatusCode: http.StatusOK,
169 expectedCalled: true,
170 expectedHeaders: map[string][]string{
171 "X-Forwarded-Proto": {"https"},
172 "X-Forwarded-Uri": {"/request/path"},
173 "X-Forwarded-For": {"127.0.0.1"},
174 "X-Remote-User": {"username"},
175 "User-Agent": {"Go-http-client/1.1"},
176 "Accept-Encoding": {"gzip"},
177 "X-Remote-Group": {"one", "two"},
178 },
179 },
180 "proxy with user, cabundle": {
181 user: &user.DefaultInfo{
182 Name: "username",
183 Groups: []string{"one", "two"},
184 },
185 path: "/request/path",
186 apiService: &apiregistration.APIService{
187 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
188 Spec: apiregistration.APIServiceSpec{
189 Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
190 Group: "foo",
191 Version: "v1",
192 CABundle: testCACrt,
193 },
194 Status: apiregistration.APIServiceStatus{
195 Conditions: []apiregistration.APIServiceCondition{
196 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
197 },
198 },
199 },
200 expectedStatusCode: http.StatusOK,
201 expectedCalled: true,
202 expectedHeaders: map[string][]string{
203 "X-Forwarded-Proto": {"https"},
204 "X-Forwarded-Uri": {"/request/path"},
205 "X-Forwarded-For": {"127.0.0.1"},
206 "X-Remote-User": {"username"},
207 "User-Agent": {"Go-http-client/1.1"},
208 "Accept-Encoding": {"gzip"},
209 "X-Remote-Group": {"one", "two"},
210 },
211 },
212 "service unavailable": {
213 user: &user.DefaultInfo{
214 Name: "username",
215 Groups: []string{"one", "two"},
216 },
217 path: "/request/path",
218 apiService: &apiregistration.APIService{
219 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
220 Spec: apiregistration.APIServiceSpec{
221 Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
222 Group: "foo",
223 Version: "v1",
224 CABundle: testCACrt,
225 },
226 Status: apiregistration.APIServiceStatus{
227 Conditions: []apiregistration.APIServiceCondition{
228 {Type: apiregistration.Available, Status: apiregistration.ConditionFalse},
229 },
230 },
231 },
232 expectedStatusCode: http.StatusServiceUnavailable,
233 },
234 "service unresolveable": {
235 user: &user.DefaultInfo{
236 Name: "username",
237 Groups: []string{"one", "two"},
238 },
239 path: "/request/path",
240 serviceResolver: &mockedRouter{err: fmt.Errorf("unresolveable")},
241 apiService: &apiregistration.APIService{
242 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
243 Spec: apiregistration.APIServiceSpec{
244 Service: &apiregistration.ServiceReference{Name: "bad-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
245 Group: "foo",
246 Version: "v1",
247 CABundle: testCACrt,
248 },
249 Status: apiregistration.APIServiceStatus{
250 Conditions: []apiregistration.APIServiceCondition{
251 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
252 },
253 },
254 },
255 expectedStatusCode: http.StatusServiceUnavailable,
256 },
257 "fail on bad serving cert": {
258 user: &user.DefaultInfo{
259 Name: "username",
260 Groups: []string{"one", "two"},
261 },
262 path: "/request/path",
263 apiService: &apiregistration.APIService{
264 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
265 Spec: apiregistration.APIServiceSpec{
266 Service: &apiregistration.ServiceReference{Port: pointer.Int32Ptr(443)},
267 Group: "foo",
268 Version: "v1",
269 },
270 Status: apiregistration.APIServiceStatus{
271 Conditions: []apiregistration.APIServiceCondition{
272 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
273 },
274 },
275 },
276 expectedStatusCode: http.StatusServiceUnavailable,
277 },
278 "fail on bad serving cert w/o SAN and increase SAN error counter metrics": {
279 user: &user.DefaultInfo{
280 Name: "username",
281 Groups: []string{"one", "two"},
282 },
283 path: "/request/path",
284 apiService: &apiregistration.APIService{
285 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
286 Spec: apiregistration.APIServiceSpec{
287 Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
288 Group: "foo",
289 Version: "v1",
290 CABundle: testCACrt,
291 },
292 Status: apiregistration.APIServiceStatus{
293 Conditions: []apiregistration.APIServiceCondition{
294 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
295 },
296 },
297 },
298 serviceCertOverride: svcCrtNoSAN,
299 increaseSANWarnCounter: true,
300 expectedStatusCode: http.StatusServiceUnavailable,
301 },
302 }
303
304 target := &targetHTTPHandler{}
305 for name, tc := range tests {
306 target.Reset()
307 legacyregistry.Reset()
308
309 func() {
310 targetServer := httptest.NewUnstartedServer(target)
311 serviceCert := tc.serviceCertOverride
312 if serviceCert == nil {
313 serviceCert = svcCrt
314 }
315 if cert, err := tls.X509KeyPair(serviceCert, svcKey); err != nil {
316 t.Fatal(err)
317 } else {
318 targetServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
319 }
320 targetServer.StartTLS()
321 defer targetServer.Close()
322
323 serviceResolver := tc.serviceResolver
324 if serviceResolver == nil {
325 serviceResolver = &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}
326 }
327 handler := &proxyHandler{
328 localDelegate: http.NewServeMux(),
329 serviceResolver: serviceResolver,
330 proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
331 }
332 server := httptest.NewServer(contextHandler(handler, tc.user))
333 defer server.Close()
334
335 if tc.apiService != nil {
336 handler.updateAPIService(tc.apiService)
337 curr := handler.handlingInfo.Load().(proxyHandlingInfo)
338 handler.handlingInfo.Store(curr)
339 }
340
341 resp, err := http.Get(server.URL + tc.path)
342 if err != nil {
343 t.Errorf("%s: %v", name, err)
344 return
345 }
346 if e, a := tc.expectedStatusCode, resp.StatusCode; e != a {
347 body, _ := httputil.DumpResponse(resp, true)
348 t.Logf("%s: %v", name, string(body))
349 t.Errorf("%s: expected %v, got %v", name, e, a)
350 return
351 }
352 bytes, err := io.ReadAll(resp.Body)
353 if err != nil {
354 t.Errorf("%s: %v", name, err)
355 return
356 }
357 if !strings.Contains(string(bytes), tc.expectedBody) {
358 t.Errorf("%s: expected %q, got %q", name, tc.expectedBody, string(bytes))
359 return
360 }
361
362 if e, a := tc.expectedCalled, target.called; e != a {
363 t.Errorf("%s: expected %v, got %v", name, e, a)
364 return
365 }
366
367 delete(target.headers, "X-Forwarded-Host")
368 if e, a := tc.expectedHeaders, target.headers; !reflect.DeepEqual(e, a) {
369 t.Errorf("%s: expected %v, got %v", name, e, a)
370 return
371 }
372 if e, a := targetServer.Listener.Addr().String(), target.host; tc.expectedCalled && !reflect.DeepEqual(e, a) {
373 t.Errorf("%s: expected %v, got %v", name, e, a)
374 return
375 }
376
377 if tc.increaseSANWarnCounter {
378 errorCounter := getSingleCounterValueFromRegistry(t, legacyregistry.DefaultGatherer, "apiserver_kube_aggregator_x509_missing_san_total")
379 if errorCounter == -1 {
380 t.Errorf("failed to get the x509_missing_san_total metrics: %v", err)
381 }
382 if int(errorCounter) != 1 {
383 t.Errorf("expected the x509_missing_san_total to be 1, but it's %d", errorCounter)
384 }
385 }
386 }()
387 }
388 }
389
390 type mockEgressDialer struct {
391 called int
392 }
393
394 func (m *mockEgressDialer) dial(ctx context.Context, net, addr string) (net.Conn, error) {
395 m.called++
396 return http.DefaultTransport.(*http.Transport).DialContext(ctx, net, addr)
397 }
398
399 func (m *mockEgressDialer) dialBroken(ctx context.Context, net, addr string) (net.Conn, error) {
400 m.called++
401 return nil, fmt.Errorf("Broken dialer")
402 }
403
404 func newDialerAndSelector() (*mockEgressDialer, *egressselector.EgressSelector) {
405 dialer := &mockEgressDialer{}
406 m := make(map[egressselector.EgressType]utilnet.DialFunc)
407 m[egressselector.Cluster] = dialer.dial
408 es := egressselector.NewEgressSelectorWithMap(m)
409 return dialer, es
410 }
411
412 func newBrokenDialerAndSelector() (*mockEgressDialer, *egressselector.EgressSelector) {
413 dialer := &mockEgressDialer{}
414 m := make(map[egressselector.EgressType]utilnet.DialFunc)
415 m[egressselector.Cluster] = dialer.dialBroken
416 es := egressselector.NewEgressSelectorWithMap(m)
417 return dialer, es
418 }
419
420 func TestProxyUpgrade(t *testing.T) {
421 upgradeUser := "upgradeUser"
422 testcases := map[string]struct {
423 APIService *apiregistration.APIService
424 NewEgressSelector func() (*mockEgressDialer, *egressselector.EgressSelector)
425 ExpectError bool
426 ExpectCalled bool
427 }{
428 "valid hostname + CABundle": {
429 APIService: &apiregistration.APIService{
430 Spec: apiregistration.APIServiceSpec{
431 CABundle: testCACrt,
432 Group: "mygroup",
433 Version: "v1",
434 Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
435 },
436 Status: apiregistration.APIServiceStatus{
437 Conditions: []apiregistration.APIServiceCondition{
438 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
439 },
440 },
441 },
442 ExpectError: false,
443 ExpectCalled: true,
444 },
445 "invalid hostname + insecure": {
446 APIService: &apiregistration.APIService{
447 Spec: apiregistration.APIServiceSpec{
448 InsecureSkipTLSVerify: true,
449 Group: "mygroup",
450 Version: "v1",
451 Service: &apiregistration.ServiceReference{Name: "invalid-service", Namespace: "invalid-ns", Port: pointer.Int32Ptr(443)},
452 },
453 Status: apiregistration.APIServiceStatus{
454 Conditions: []apiregistration.APIServiceCondition{
455 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
456 },
457 },
458 },
459 ExpectError: false,
460 ExpectCalled: true,
461 },
462 "invalid hostname + CABundle": {
463 APIService: &apiregistration.APIService{
464 Spec: apiregistration.APIServiceSpec{
465 CABundle: testCACrt,
466 Group: "mygroup",
467 Version: "v1",
468 Service: &apiregistration.ServiceReference{Name: "invalid-service", Namespace: "invalid-ns", Port: pointer.Int32Ptr(443)},
469 },
470 Status: apiregistration.APIServiceStatus{
471 Conditions: []apiregistration.APIServiceCondition{
472 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
473 },
474 },
475 },
476 ExpectError: true,
477 ExpectCalled: false,
478 },
479 "valid hostname + CABundle + egress selector": {
480 APIService: &apiregistration.APIService{
481 Spec: apiregistration.APIServiceSpec{
482 CABundle: testCACrt,
483 Group: "mygroup",
484 Version: "v1",
485 Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
486 },
487 Status: apiregistration.APIServiceStatus{
488 Conditions: []apiregistration.APIServiceCondition{
489 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
490 },
491 },
492 },
493 NewEgressSelector: newDialerAndSelector,
494 ExpectError: false,
495 ExpectCalled: true,
496 },
497 "valid hostname + CABundle + egress selector non working": {
498 APIService: &apiregistration.APIService{
499 Spec: apiregistration.APIServiceSpec{
500 CABundle: testCACrt,
501 Group: "mygroup",
502 Version: "v1",
503 Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
504 },
505 Status: apiregistration.APIServiceStatus{
506 Conditions: []apiregistration.APIServiceCondition{
507 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
508 },
509 },
510 },
511 NewEgressSelector: newBrokenDialerAndSelector,
512 ExpectError: true,
513 ExpectCalled: false,
514 },
515 }
516
517 for k, tc := range testcases {
518 tcName := k
519 t.Run(tcName, func(t *testing.T) {
520 path := "/apis/" + tc.APIService.Spec.Group + "/" + tc.APIService.Spec.Version + "/foo"
521 timesCalled := int32(0)
522 backendHandler := http.NewServeMux()
523 backendHandler.Handle(path, websocket.Handler(func(ws *websocket.Conn) {
524 atomic.AddInt32(×Called, 1)
525 defer ws.Close()
526 req := ws.Request()
527 user := req.Header.Get("X-Remote-User")
528 if user != upgradeUser {
529 t.Errorf("expected user %q, got %q", upgradeUser, user)
530 }
531 body := make([]byte, 5)
532 ws.Read(body)
533 ws.Write([]byte("hello " + string(body)))
534 }))
535
536 backendServer := httptest.NewUnstartedServer(backendHandler)
537 cert, err := tls.X509KeyPair(svcCrt, svcKey)
538 if err != nil {
539 t.Errorf("https (valid hostname): %v", err)
540 return
541 }
542 backendServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
543 backendServer.StartTLS()
544 defer backendServer.Close()
545
546 defer func() {
547 if called := atomic.LoadInt32(×Called) > 0; called != tc.ExpectCalled {
548 t.Errorf("%s: expected called=%v, got %v", tcName, tc.ExpectCalled, called)
549 }
550 }()
551
552 serverURL, _ := url.Parse(backendServer.URL)
553 proxyHandler := &proxyHandler{
554 serviceResolver: &mockedRouter{destinationHost: serverURL.Host},
555 proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
556 }
557
558 var dialer *mockEgressDialer
559 var selector *egressselector.EgressSelector
560 if tc.NewEgressSelector != nil {
561 dialer, selector = tc.NewEgressSelector()
562
563 egressDialer, err := selector.Lookup(egressselector.Cluster.AsNetworkContext())
564 if err != nil {
565 t.Fatal(err)
566 }
567 if egressDialer != nil {
568 proxyHandler.proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
569 }
570 }
571
572 proxyHandler.updateAPIService(tc.APIService)
573 aggregator := httptest.NewServer(contextHandler(proxyHandler, &user.DefaultInfo{Name: upgradeUser}))
574 defer aggregator.Close()
575
576 ws, err := websocket.Dial("ws://"+aggregator.Listener.Addr().String()+path, "", "http://127.0.0.1/")
577 if err != nil {
578 if !tc.ExpectError {
579 t.Errorf("%s: websocket dial err: %s", tcName, err)
580 }
581 return
582 }
583 defer ws.Close()
584
585
586 if dialer != nil && dialer.called != 1 {
587 t.Errorf("expect egress dialer gets called %d times, got %d", 1, dialer.called)
588 }
589
590 if tc.ExpectError {
591 t.Errorf("%s: expected websocket error, got none", tcName)
592 return
593 }
594
595 if _, err := ws.Write([]byte("world")); err != nil {
596 t.Errorf("%s: write err: %s", tcName, err)
597 return
598 }
599
600 response := make([]byte, 20)
601 n, err := ws.Read(response)
602 if err != nil {
603 t.Errorf("%s: read err: %s", tcName, err)
604 return
605 }
606 if e, a := "hello world", string(response[0:n]); e != a {
607 t.Errorf("%s: expected '%#v', got '%#v'", tcName, e, a)
608 return
609 }
610 })
611 }
612 }
613
614 var testCACrt = []byte(`-----BEGIN CERTIFICATE-----
615 MIIDGTCCAgGgAwIBAgIUAlOGbZ9MSBRFDMq483nGW7h4YNIwDQYJKoZIhvcNAQEL
616 BQAwGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTAgFw0yMDEwMDcxNDI4MDVa
617 GA8yMjk0MDcyMzE0MjgwNVowGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTCC
618 ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANQF9aox1wJlB7wrFeEDYlRk
619 2AIfC28PZYjW3LsW7/gas2ImmRpzdZYq3nNFQwF67sUudeuuNNAvEngb8Q1wojG7
620 Uftt52c9e0Hi5LDxElWV3Tw1XyZFJsk5uwVNb377r7CDfTX3WUsX1WlUeUF6xmwE
621 M4jYQJ9pMPNUOEWpe7G8daTYineTVvrHvGpxVMMSpOWTWy4+oqWaz5tfFSbyvNZT
622 +eOLNkDo441KfXvb66zWV4AEfB2QDyGGMuPUT/FgsZHNuj/WNjt3bWvyey9ZGlDm
623 LPnJgbzEP1FnfIdtuSpHhbWox2Jnuht4hCwhTW1lcAi68MSQEs8KqptEhIJoIxkC
624 AwEAAaNTMFEwHQYDVR0OBBYEFJnGJQd3VkQP5cZLB1n9/FRKyBLPMB8GA1UdIwQY
625 MBaAFJnGJQd3VkQP5cZLB1n9/FRKyBLPMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI
626 hvcNAQELBQADggEBALwqR2oo3v5Ghs9hS1YQIqegQ/IGZqQwiRz2HFTUGzf5+nUY
627 BpZHQPapLJ6Ki687rY4nkdEAMCeZjefBFc6uawo6rY4O8IiJAQbDprNNK8oerwiM
628 BWSDDDjoNxMZMCegSAv39YSonecKZsg7+l1K/nmuQNehgHNem71ZroaRCFvJJ59E
629 WSd3QP+Gh9iKabsDnkBrTk5KFa7X24c43DJ23kPE49NOwBhiM6Fs8q+tdzWzaVSb
630 56uXONZxYmFH5yDFvnBIqk2Fys5Klsn6IsM1BCgH2snbA6kwh9Kph4pLdAVGyR9i
631 MxfBxx4eUypOzIBGqa3OmvMcuNElBe8fcUtpqO0=
632 -----END CERTIFICATE-----`)
633
634
663
664
665
666 var svcCrt = []byte(`-----BEGIN CERTIFICATE-----
667 MIIDMjCCAhqgAwIBAgIUEBND1EVKxjU7UaJ1ZBw1glkXuaowDQYJKoZIhvcNAQEL
668 BQAwGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTAgFw0yMDEwMDcxNDI4MDVa
669 GA8yMjk0MDcyMzE0MjgwNVowIzEhMB8GA1UEAwwYdGVzdC1zZXJ2aWNlLnRlc3Qt
670 bnMuc3ZjMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvDXYxvaUdbX/
671 MA3+3SdYY4o8Jl2s1PW9MX4Mr/nCNltyOKDgfSABCN4XVsrd+/A+/zQt+EyJEJxM
672 rd1syhzd/TJAnGzexmZg/dIi0jC3oBe/qyERWimZhqbu0O+0EpFx5qLzQ5eLabLU
673 9CtBwRSyYQjqsDmPoqplsKxaFF9NIFQrh1zmxBay9vTY7P7sLkfZ8LifP6jgQ5NH
674 QkjaY9XCMzYbcrzbc2r9vxTm//IR1cWxaifTNE9qo2NL1iiPGTpot65z83BWeu/q
675 WOU+aGUhY/xcZH0w/rUJ7ffviyd94EY4IN7FUJv53EJgmEp4UOaY1fAFtAFQQbVz
676 tGjYGpZ22wIDAQABo2QwYjAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUE
677 FjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwKQYDVR0RBCIwIIcEfwAAAYIYdGVzdC1z
678 ZXJ2aWNlLnRlc3QtbnMuc3ZjMA0GCSqGSIb3DQEBCwUAA4IBAQCw/EoFXFahLC4g
679 4iq9VWhnCmAqUv6IuJqOMC+qEH7fSB3UDAjL4A2iuNJaBAxhI2bccoP2wtqZCkHH
680 0YLyoKOPjgl6VZtByco8Su7T9yOaef6aX1OP4Snm/aeYdVbjSBKVwMywmmb34XFa
681 azChi6sq4TFPNesUUoEGkKErU+XG/ecp9Obc0DK/3AAVx/Fk8W5104m1i9PWlUZ2
682 KlyxQ5F2alBRv9csIpl2syWQ90DMSQ1Y/R8b+kfsBG7RwDbmwGpZLQTwhE8Uga9T
683 ZDnmwjUmWn7SD3ouyBSnbWkLE1KcbB32mz5jrwfKCPIa5ka+GIFrme1HxRoQziGo
684 w+KU2RWu
685 -----END CERTIFICATE-----`)
686
687 var svcCrtNoSAN = []byte(`-----BEGIN CERTIFICATE-----
688 MIIDBzCCAe+gAwIBAgIUEBND1EVKxjU7UaJ1ZBw1glkXuaswDQYJKoZIhvcNAQEL
689 BQAwGzEZMBcGA1UEAwwQd2ViaG9va190ZXN0c19jYTAgFw0yMDEwMDcxNDI4MDVa
690 GA8yMjk0MDcyMzE0MjgwNVowIzEhMB8GA1UEAwwYdGVzdC1zZXJ2aWNlLnRlc3Qt
691 bnMuc3ZjMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvDXYxvaUdbX/
692 MA3+3SdYY4o8Jl2s1PW9MX4Mr/nCNltyOKDgfSABCN4XVsrd+/A+/zQt+EyJEJxM
693 rd1syhzd/TJAnGzexmZg/dIi0jC3oBe/qyERWimZhqbu0O+0EpFx5qLzQ5eLabLU
694 9CtBwRSyYQjqsDmPoqplsKxaFF9NIFQrh1zmxBay9vTY7P7sLkfZ8LifP6jgQ5NH
695 QkjaY9XCMzYbcrzbc2r9vxTm//IR1cWxaifTNE9qo2NL1iiPGTpot65z83BWeu/q
696 WOU+aGUhY/xcZH0w/rUJ7ffviyd94EY4IN7FUJv53EJgmEp4UOaY1fAFtAFQQbVz
697 tGjYGpZ22wIDAQABozkwNzAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUE
698 FjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwDQYJKoZIhvcNAQELBQADggEBAMPhbecq
699 wJtlKnSe27xQIM1bNkI/+r1aVmuJqYYbtzCaVZFnFRD6ZbCLfEo7QT17gs7ulryI
700 yfeITEMAWG6Bq8cOhNQfXRIf2YMFHbDsFbfAEREy/jfYGw8G4b6RBVQzcuglCCB/
701 Y0++skz8kYIR1KuZnCtC6A0kaM2XrTWCXAc5KB0Q/WO0wqqWbH/xmEYQVZmDqWOH
702 k+qVFD+I1oT5NOzFpzaUe4T7grzoLs24IE0c+0clcc9pxTDXTfPyoLG9n3zxG0Ma
703 hPtkUeeEK8p73Zf/F4JHQ4tJv5XY1ytWkTROE79P6qT0BY/XZSpsGmB7TIS7wFCW
704 RfKAqN95Uso3IBI=
705 -----END CERTIFICATE-----`)
706
707 var svcKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
708 MIIEpAIBAAKCAQEAvDXYxvaUdbX/MA3+3SdYY4o8Jl2s1PW9MX4Mr/nCNltyOKDg
709 fSABCN4XVsrd+/A+/zQt+EyJEJxMrd1syhzd/TJAnGzexmZg/dIi0jC3oBe/qyER
710 WimZhqbu0O+0EpFx5qLzQ5eLabLU9CtBwRSyYQjqsDmPoqplsKxaFF9NIFQrh1zm
711 xBay9vTY7P7sLkfZ8LifP6jgQ5NHQkjaY9XCMzYbcrzbc2r9vxTm//IR1cWxaifT
712 NE9qo2NL1iiPGTpot65z83BWeu/qWOU+aGUhY/xcZH0w/rUJ7ffviyd94EY4IN7F
713 UJv53EJgmEp4UOaY1fAFtAFQQbVztGjYGpZ22wIDAQABAoIBAD7Wl5buUujuJ9Jq
714 idJaxZcOW0DP+9lqZo10sVW7xM0TQRKJHAqKue21AQPYXb81GkNor4R8QTMLjEps
715 aFsewjs8IPhZHRQOsIluNHQLEfPgmfzP4JRC2WBsscWOkoe0idvgQeoqWcCjlZgk
716 LSMC/v+I05qczUkZLTSMhtLQcta80OxU99kNU8Kfi6NFiAioqVQl4KlczjLJiUbK
717 3RGOqThtjS0IzXXFr+T+bgxQkmkyAPGmx06OqqM8hdA+6WsRb8LS1XfK7qGWbU0T
718 7mIehkcMFDRgxlDh4JfCQzWuLTax3Ds8BApJwZCBEQz8T+FbVWJpBwezyhaKBOis
719 nQmtw8ECgYEA3E+mANY6YNVfFztMpjfh57dY2DLZY9h1yHRK13FM7EK0Z8GgMji6
720 kDIubUBta19g3+YI4qIJgvS527ipVEHW0lYUIQ3q+JnafTC7mMxT+2J/j+lrZhrw
721 aIPxZML29iEm64Wr3mCmUU98iy5z7EUqqKTNwr03f2eSBeO/xn6VtrsCgYEA2rL4
722 tOJMoMDfQzAe7KIqEUn2Ob0nYP/MJZ1I8wrrdGMDhp4xofr+m99++uFPqm5u5uI5
723 cJ6+xZQ1A6CJSKWtzOALsKN1xx+JJh9Wo2vUliDomKtarFiQO+ONLpnjuSraDMWY
724 cKx6eXqqgit5hlQeCva2cbUP1De++3RhEpC6DmECgYA8kCiyUjH6LK3XVRXdG7+e
725 U2i5BkF8kSTP1ig80Yiz6iJt42yGYdHnkePxZKSvv6iB5FrM8n5q4Zu2Ky1hXDgR
726 2lfuPkU50hGeGKd5ebIciRdIGILNrton4R2a9X2ua66nUDfPCgKul4tFN5/mc50m
727 fyeRQTLgczhRJiqyBlphwQKBgQDTnjBIH12Ug2zF/688vGHGXvIRxrVvB7XLg9lN
728 y/gvo4uK3FIccdmijG27Zv+GY9uOL8Ly9biVSKbPvqx4jlCRmQ3WuyTBLAOyzsov
729 0axgJLHM4KoZcI0IVlSLjj8rMorRpvWtuUe9enO5B0ZNM+HqK/Y4KsKJT/POLzur
730 Ej3moQKBgQC+RWcly9opx0We4LG0lcdG3V0cawDRP2MmLbxHA/kSuGf5aBMJoCdf
731 f0vRPPCK7dpPGOX9x8Oz7K7QiOEvFL3Mv1sWBEnl5lSkK8gdBhi6St9RRBGimt2H
732 S+8g5OWupiWGF6qN+XX5WgYyuipW8mVRaROj8Vyl7JSiwu6KHfZ8RQ==
733 -----END RSA PRIVATE KEY-----`)
734
735 func TestGetContextForNewRequest(t *testing.T) {
736 done := make(chan struct{})
737 server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
738 <-done
739 }))
740 defer server.Close()
741 defer close(done)
742
743 proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
744 location, err := url.Parse(server.URL)
745 if err != nil {
746 t.Fatal(err)
747 }
748 location.Path = req.URL.Path
749
750 nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path}))
751 newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, nestedReq)
752 defer cancelFn()
753
754 theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w})
755 theproxy.ServeHTTP(w, newReq)
756 }))
757 defer proxyServer.Close()
758
759
760 resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version")
761 if err != nil {
762 t.Fatal(err)
763 }
764 if resp.StatusCode != http.StatusServiceUnavailable {
765 t.Error(err)
766 }
767 body, err := io.ReadAll(resp.Body)
768 if err != nil {
769 t.Fatal(err)
770 }
771 if !strings.Contains(string(body), "context deadline exceeded") {
772 t.Error(string(body))
773 }
774
775 }
776
777 func TestNewRequestForProxyWithAuditID(t *testing.T) {
778 tests := []struct {
779 name string
780 auditID string
781 }{
782 {
783 name: "original request has Audit-ID",
784 auditID: "foo-bar",
785 },
786 {
787 name: "original request does not have Audit-ID",
788 auditID: "",
789 },
790 }
791
792 for _, test := range tests {
793 t.Run(test.name, func(t *testing.T) {
794 req, err := http.NewRequest(http.MethodGet, "/api/group/version/foos/namespace/foo", nil)
795 if err != nil {
796 t.Fatalf("failed to create new http request - %v", err)
797 }
798
799 req = req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path}))
800 if len(test.auditID) > 0 {
801 ctx := audit.WithAuditContext(req.Context())
802 audit.WithAuditID(ctx, types.UID(test.auditID))
803 req = req.WithContext(ctx)
804 }
805
806 newReq, _ := apiserverproxyutil.NewRequestForProxy(req.URL, req)
807 if newReq == nil {
808 t.Fatal("expected a non nil Request object")
809 }
810
811 auditIDGot := newReq.Header.Get("Audit-ID")
812 if test.auditID != auditIDGot {
813 t.Errorf("expected an Audit-ID value: %q, but got: %q", test.auditID, auditIDGot)
814 }
815 })
816 }
817 }
818
819
820
821
822
823
824
825
826 func TestProxyCertReload(t *testing.T) {
827
828
829 backendHandler := &targetHTTPHandler{}
830 backendServer := httptest.NewUnstartedServer(backendHandler)
831 if cert, err := tls.X509KeyPair(backendCertificate(), backendKey()); err != nil {
832 t.Fatal(err)
833 } else {
834 caCertPool := x509.NewCertPool()
835
836 caCertPool.AppendCertsFromPEM(clientCaCrt())
837 backendServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}, ClientAuth: tls.RequireAndVerifyClientCert, ClientCAs: caCertPool}
838 }
839 backendServer.StartTLS()
840 defer backendServer.Close()
841
842
843 aggregatorHandler := &proxyHandler{
844 localDelegate: http.NewServeMux(),
845 serviceResolver: &mockedRouter{destinationHost: backendServer.Listener.Addr().String()},
846 }
847 certFile, keyFile, dir := getCertAndKeyPaths(t)
848 writeCerts(certFile, keyFile, backendCertificate(), backendKey(), t)
849
850 defer func() {
851 if err := os.RemoveAll(dir); err != nil {
852 t.Errorf("Unable to clean up test directory %q: %v", dir, err)
853 }
854 }()
855
856 certProvider, err := dynamiccertificates.NewDynamicServingContentFromFiles("test", certFile, keyFile)
857 if err != nil {
858 t.Fatalf("Unable to create dynamic certificates: %v", err)
859 }
860 ctx := context.TODO()
861 err = certProvider.RunOnce(ctx)
862 if err != nil {
863 t.Fatalf("Unable to load dynamic certificates: %v", err)
864 }
865 aggregatorHandler.proxyCurrentCertKeyContent = certProvider.CurrentCertKeyContent
866
867 apiService := &apiregistration.APIService{
868 ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
869 Spec: apiregistration.APIServiceSpec{
870 Service: &apiregistration.ServiceReference{Name: "test-service2", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
871 Group: "foo",
872 Version: "v1",
873 CABundle: backendCaCertificate(),
874 },
875 Status: apiregistration.APIServiceStatus{
876 Conditions: []apiregistration.APIServiceCondition{
877 {Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
878 },
879 },
880 }
881 aggregatorHandler.updateAPIService(apiService)
882
883 server := httptest.NewServer(contextHandler(aggregatorHandler, &user.DefaultInfo{
884 Name: "username",
885 Groups: []string{"one", "two"},
886 }))
887 defer server.Close()
888
889 resp, err := http.Get(server.URL + "/request/path")
890 if err != nil {
891 t.Fatalf("got unexpected error: %v", err)
892 }
893 if resp.StatusCode != http.StatusServiceUnavailable {
894 t.Fatalf("Expected status code 503 but got %d", resp.StatusCode)
895 }
896
897
898
899 writeCerts(certFile, keyFile, clientCert(), clientKey(), t)
900 err = certProvider.RunOnce(ctx)
901 if err != nil {
902 t.Fatalf("Expected no error when refreshing dynamic certs, got %v", err)
903 }
904 aggregatorHandler.updateAPIService(apiService)
905
906 resp, err = http.Get(server.URL + "/request/path")
907 if err != nil {
908 t.Errorf("%v", err)
909 }
910 if resp.StatusCode != http.StatusOK {
911 t.Fatalf("Expected status code 200 but got %d", resp.StatusCode)
912 }
913 }
914
915 type fcInitSignal struct {
916 nSignals int32
917 }
918
919 func (s *fcInitSignal) SignalCount() int {
920 return int(atomic.SwapInt32(&s.nSignals, 0))
921 }
922
923 func (s *fcInitSignal) Signal() {
924 atomic.AddInt32(&s.nSignals, 1)
925 }
926
927 func (s *fcInitSignal) Wait() {
928 }
929
930 type hookedListener struct {
931 l net.Listener
932 onAccept func()
933 }
934
935 func (wl *hookedListener) Accept() (net.Conn, error) {
936 conn, err := wl.l.Accept()
937 if err == nil {
938 wl.onAccept()
939 }
940 return conn, err
941 }
942
943 func (wl *hookedListener) Close() error {
944 return wl.l.Close()
945 }
946
947 func (wl *hookedListener) Addr() net.Addr {
948 return wl.l.Addr()
949 }
950
951 func TestFlowControlSignal(t *testing.T) {
952 for _, tc := range []struct {
953 Name string
954 Local bool
955 Available bool
956 Request http.Request
957 SignalExpected bool
958 }{
959 {
960 Name: "local",
961 Local: true,
962 SignalExpected: false,
963 },
964 {
965 Name: "unavailable",
966 Local: false,
967 Available: false,
968 SignalExpected: false,
969 },
970 {
971 Name: "request performed",
972 Local: false,
973 Available: true,
974 SignalExpected: true,
975 },
976 {
977 Name: "upgrade request performed",
978 Local: false,
979 Available: true,
980 Request: http.Request{
981 Header: http.Header{"Connection": []string{"Upgrade"}},
982 },
983 SignalExpected: true,
984 },
985 } {
986 t.Run(tc.Name, func(t *testing.T) {
987 okh := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
988 w.WriteHeader(http.StatusOK)
989 })
990
991 var sig fcInitSignal
992
993 var signalCountOnAccept int32
994 backend := httptest.NewUnstartedServer(okh)
995 backend.Listener = &hookedListener{
996 l: backend.Listener,
997 onAccept: func() {
998 atomic.StoreInt32(&signalCountOnAccept, int32(sig.SignalCount()))
999 },
1000 }
1001 backend.Start()
1002 defer backend.Close()
1003
1004 p := proxyHandler{
1005 localDelegate: okh,
1006 serviceResolver: &mockedRouter{destinationHost: backend.Listener.Addr().String()},
1007 }
1008
1009 server := httptest.NewServer(contextHandler(
1010 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1011 p.ServeHTTP(w, r.WithContext(utilflowcontrol.WithInitializationSignal(r.Context(), &sig)))
1012 }),
1013 &user.DefaultInfo{
1014 Name: "username",
1015 Groups: []string{"one", "two"},
1016 },
1017 ))
1018 defer server.Close()
1019
1020 p.handlingInfo.Store(proxyHandlingInfo{
1021 local: tc.Local,
1022 serviceAvailable: tc.Available,
1023 proxyRoundTripper: backend.Client().Transport,
1024 })
1025
1026 surl, err := url.Parse(server.URL)
1027 if err != nil {
1028 t.Fatalf("unexpected error: %v", err)
1029 }
1030
1031 req := tc.Request
1032 req.URL = surl
1033 res, err := server.Client().Do(&req)
1034 if err != nil {
1035 t.Fatalf("unexpected error: %v", err)
1036 }
1037 if err := res.Body.Close(); err != nil {
1038 t.Fatalf("unexpected error: %v", err)
1039 }
1040
1041 if fired := (atomic.LoadInt32(&signalCountOnAccept) > 0); tc.SignalExpected && !fired {
1042 t.Errorf("flow control signal expected but not fired")
1043 } else if fired && !tc.SignalExpected {
1044 t.Errorf("flow control signal fired but not expected")
1045 }
1046 })
1047 }
1048 }
1049
1050 func getCertAndKeyPaths(t *testing.T) (string, string, string) {
1051 dir, err := os.MkdirTemp(os.TempDir(), "k8s-test-handler-proxy-cert")
1052 if err != nil {
1053 t.Fatalf("Unable to create the test directory %q: %v", dir, err)
1054 }
1055 certFile := filepath.Join(dir, "certfile.pem")
1056 keyFile := filepath.Join(dir, "keytfile.pem")
1057 return certFile, keyFile, dir
1058 }
1059
1060 func writeCerts(certFile, keyFile string, certContent, keyContent []byte, t *testing.T) {
1061 if err := os.WriteFile(certFile, certContent, 0600); err != nil {
1062 t.Fatalf("Unable to create the file %q: %v", certFile, err)
1063 }
1064 if err := os.WriteFile(keyFile, keyContent, 0600); err != nil {
1065 t.Fatalf("Unable to create the file %q: %v", keyFile, err)
1066 }
1067 }
1068
1069 func getSingleCounterValueFromRegistry(t *testing.T, r metrics.Gatherer, name string) int {
1070 mfs, err := r.Gather()
1071 if err != nil {
1072 t.Logf("failed to gather local registry metrics: %v", err)
1073 return -1
1074 }
1075
1076 for _, mf := range mfs {
1077 if mf.Name != nil && *mf.Name == name {
1078 mfMetric := mf.GetMetric()
1079 for _, m := range mfMetric {
1080 if m.GetCounter() != nil {
1081 return int(m.GetCounter().GetValue())
1082 }
1083 }
1084 }
1085 }
1086
1087 return -1
1088 }
1089
1090 func readTestFile(filename string) []byte {
1091 data, err := os.ReadFile("testdata/" + filename)
1092 if err != nil {
1093 panic(err)
1094 }
1095 return data
1096 }
1097
1098
1099 func clientCert() []byte { return readTestFile("client.pem") }
1100
1101 func clientKey() []byte { return readTestFile("client-key.pem") }
1102
1103 func backendCertificate() []byte { return readTestFile("server.pem") }
1104
1105 func backendKey() []byte { return readTestFile("server-key.pem") }
1106
1107 func backendCaCertificate() []byte { return readTestFile("server-ca.pem") }
1108
1109 func clientCaCrt() []byte { return readTestFile("client-ca.pem") }
1110
View as plain text