     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package admissionwebhook
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"crypto/x509"
    23  	"encoding/json"
    24  	"fmt"
    25  	"io"
    26  	"net"
    27  	"net/http"
    28  	"net/url"
    29  	"sync"
    30  	"sync/atomic"
    31  	"testing"
    32  	"time"
    34  	"k8s.io/api/admission/v1beta1"
    35  	admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
    36  	corev1 "k8s.io/api/core/v1"
    37  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    38  	"k8s.io/apimachinery/pkg/types"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	clientset "k8s.io/client-go/kubernetes"
    41  	"k8s.io/client-go/rest"
    42  	"k8s.io/kubernetes/cmd/kube-apiserver/app"
    43  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    44  	"k8s.io/kubernetes/test/integration/framework"
    45  )
    47  const (
    48  	testLoadBalanceClientUsername = "webhook-balance-integration-client"
    49  )
    51  type staticURLServiceResolver string
    53  func (u staticURLServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
    54  	return url.Parse(string(u))
    55  }
    57  // TestWebhookLoadBalance ensures that the admission webhook opens multiple connections to backends to satisfy concurrent requests
    58  func TestWebhookLoadBalance(t *testing.T) {
    59  	roots := x509.NewCertPool()
    60  	if !roots.AppendCertsFromPEM(localhostCert) {
    61  		t.Fatal("Failed to append Cert from PEM")
    62  	}
    63  	cert, err := tls.X509KeyPair(localhostCert, localhostKey)
    64  	if err != nil {
    65  		t.Fatalf("Failed to build cert with error: %+v", err)
    66  	}
    68  	tests := []struct {
    69  		name     string
    70  		http2    bool
    71  		expected int64
    72  	}{
    73  		{
    74  			name:     "10 connections when using http1",
    75  			http2:    false,
    76  			expected: 10,
    77  		},
    78  		{
    79  			name:     "1 connections when using http2",
    80  			http2:    true,
    81  			expected: 1,
    82  		},
    83  	}
    85  	for _, tc := range tests {
    86  		t.Run(tc.name, func(t *testing.T) {
    87  			localListener, err := net.Listen("tcp", "")
    88  			if err != nil {
    89  				if localListener, err = net.Listen("tcp6", "[::1]:0"); err != nil {
    90  					t.Fatal(err)
    91  				}
    92  			}
    93  			trackingListener := &connectionTrackingListener{delegate: localListener}
    95  			recorder := &connectionRecorder{}
    96  			handler := newLoadBalanceWebhookHandler(recorder)
    97  			httpServer := &http.Server{
    98  				Handler: handler,
    99  				TLSConfig: &tls.Config{
   100  					RootCAs:      roots,
   101  					Certificates: []tls.Certificate{cert},
   102  				},
   103  			}
   104  			go func() {
   105  				_ = httpServer.ServeTLS(trackingListener, "", "")
   106  			}()
   107  			defer func() {
   108  				_ = httpServer.Close()
   109  			}()
   111  			webhookURL := "https://" + localListener.Addr().String()
   112  			t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(webhookURL)))
   114  			s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
   115  				"--disable-admission-plugins=ServiceAccount",
   116  			}, framework.SharedEtcd())
   117  			defer s.TearDownFn()
   119  			// Configure a client with a distinct user name so that it is easy to distinguish requests
   120  			// made by the client from requests made by controllers. We use this to filter out requests
   121  			// before recording them to ensure we don't accidentally mistake requests from controllers
   122  			// as requests made by the client.
   123  			clientConfig := rest.CopyConfig(s.ClientConfig)
   124  			clientConfig.QPS = 100
   125  			clientConfig.Burst = 200
   126  			clientConfig.Impersonate.UserName = testLoadBalanceClientUsername
   127  			clientConfig.Impersonate.Groups = []string{"system:masters", "system:authenticated"}
   128  			client, err := clientset.NewForConfig(clientConfig)
   129  			if err != nil {
   130  				t.Fatalf("unexpected error: %v", err)
   131  			}
   133  			_, err = client.CoreV1().Pods("default").Create(context.TODO(), loadBalanceMarkerFixture, metav1.CreateOptions{})
   134  			if err != nil {
   135  				t.Fatal(err)
   136  			}
   138  			upCh := recorder.Reset()
   139  			ns := "load-balance"
   140  			_, err = client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
   141  			if err != nil {
   142  				t.Fatal(err)
   143  			}
   145  			webhooksClientConfig := admissionregistrationv1.WebhookClientConfig{
   146  				CABundle: localhostCert,
   147  			}
   148  			if tc.http2 {
   149  				webhooksClientConfig.URL = &webhookURL
   150  			} else {
   151  				webhooksClientConfig.Service = &admissionregistrationv1.ServiceReference{
   152  					Namespace: "test",
   153  					Name:      "webhook",
   154  				}
   155  			}
   156  			fail := admissionregistrationv1.Fail
   157  			mutatingCfg, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.MutatingWebhookConfiguration{
   158  				ObjectMeta: metav1.ObjectMeta{Name: "admission.integration.test"},
   159  				Webhooks: []admissionregistrationv1.MutatingWebhook{{
   160  					Name:         "admission.integration.test",
   161  					ClientConfig: webhooksClientConfig,
   162  					Rules: []admissionregistrationv1.RuleWithOperations{{
   163  						Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.OperationAll},
   164  						Rule:       admissionregistrationv1.Rule{APIGroups: []string{""}, APIVersions: []string{"v1"}, Resources: []string{"pods"}},
   165  					}},
   166  					FailurePolicy:           &fail,
   167  					AdmissionReviewVersions: []string{"v1beta1"},
   168  					SideEffects:             &noSideEffects,
   169  				}},
   170  			}, metav1.CreateOptions{})
   171  			if err != nil {
   172  				t.Fatal(err)
   173  			}
   174  			defer func() {
   175  				err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(context.TODO(), mutatingCfg.GetName(), metav1.DeleteOptions{})
   176  				if err != nil {
   177  					t.Fatal(err)
   178  				}
   179  			}()
   181  			// wait until new webhook is called the first time
   182  			if err := wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*5, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
   183  				_, err = client.CoreV1().Pods("default").Patch(ctx, loadBalanceMarkerFixture.Name, types.JSONPatchType, []byte("[]"), metav1.PatchOptions{})
   184  				select {
   185  				case <-upCh:
   186  					return true, nil
   187  				default:
   188  					t.Logf("Waiting for webhook to become effective, getting marker object: %v", err)
   189  					return false, nil
   190  				}
   191  			}); err != nil {
   192  				t.Fatal(err)
   193  			}
   195  			pod := func() *corev1.Pod {
   196  				return &corev1.Pod{
   197  					ObjectMeta: metav1.ObjectMeta{
   198  						Namespace:    ns,
   199  						GenerateName: "loadbalance-",
   200  					},
   201  					Spec: corev1.PodSpec{
   202  						Containers: []corev1.Container{{
   203  							Name:  "fake-name",
   204  							Image: "fakeimage",
   205  						}},
   206  					},
   207  				}
   208  			}
   210  			// Submit 10 parallel requests
   211  			wg := &sync.WaitGroup{}
   212  			for i := 0; i < 10; i++ {
   213  				wg.Add(1)
   214  				go func() {
   215  					defer wg.Done()
   216  					_, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod(), metav1.CreateOptions{})
   217  					if err != nil {
   218  						t.Error(err)
   219  					}
   220  				}()
   221  			}
   222  			wg.Wait()
   224  			actual := atomic.LoadInt64(&trackingListener.connections)
   225  			if tc.http2 && actual != tc.expected {
   226  				t.Errorf("expected %d connections, got %d", tc.expected, actual)
   227  			}
   228  			if !tc.http2 && actual < tc.expected {
   229  				t.Errorf("expected at least %d connections, got %d", tc.expected, actual)
   230  			}
   231  			trackingListener.Reset()
   233  			// Submit 10 more parallel requests
   234  			wg = &sync.WaitGroup{}
   235  			for i := 0; i < 10; i++ {
   236  				wg.Add(1)
   237  				go func() {
   238  					defer wg.Done()
   239  					_, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod(), metav1.CreateOptions{})
   240  					if err != nil {
   241  						t.Error(err)
   242  					}
   243  				}()
   244  			}
   245  			wg.Wait()
   247  			if actual := atomic.LoadInt64(&trackingListener.connections); actual > 0 {
   248  				t.Errorf("expected no additional connections (reusing kept-alive connections), got %d", actual)
   249  			}
   250  		})
   251  	}
   253  }
   255  type connectionRecorder struct {
   256  	mu     sync.Mutex
   257  	upCh   chan struct{}
   258  	upOnce sync.Once
   259  }
   261  // Reset zeros out all counts and returns a channel that is closed when the first admission of the
   262  // marker object is received.
   263  func (i *connectionRecorder) Reset() chan struct{} {
   264  	i.mu.Lock()
   265  	defer i.mu.Unlock()
   266  	i.upCh = make(chan struct{})
   267  	i.upOnce = sync.Once{}
   268  	return i.upCh
   269  }
   271  func (i *connectionRecorder) MarkerReceived() {
   272  	i.mu.Lock()
   273  	defer i.mu.Unlock()
   274  	i.upOnce.Do(func() {
   275  		close(i.upCh)
   276  	})
   277  }
   279  func newLoadBalanceWebhookHandler(recorder *connectionRecorder) http.Handler {
   280  	allow := func(w http.ResponseWriter) {
   281  		w.Header().Set("Content-Type", "application/json")
   282  		json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
   283  			Response: &v1beta1.AdmissionResponse{
   284  				Allowed: true,
   285  			},
   286  		})
   287  	}
   288  	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   289  		fmt.Println(r.Proto)
   290  		defer r.Body.Close()
   291  		data, err := io.ReadAll(r.Body)
   292  		if err != nil {
   293  			http.Error(w, err.Error(), 400)
   294  		}
   295  		review := v1beta1.AdmissionReview{}
   296  		if err := json.Unmarshal(data, &review); err != nil {
   297  			http.Error(w, err.Error(), 400)
   298  		}
   299  		if review.Request.UserInfo.Username != testLoadBalanceClientUsername {
   300  			// skip requests not originating from this integration test's client
   301  			allow(w)
   302  			return
   303  		}
   305  		if len(review.Request.Object.Raw) == 0 {
   306  			http.Error(w, err.Error(), 400)
   307  		}
   308  		pod := &corev1.Pod{}
   309  		if err := json.Unmarshal(review.Request.Object.Raw, pod); err != nil {
   310  			http.Error(w, err.Error(), 400)
   311  		}
   313  		// When resetting between tests, a marker object is patched until this webhook
   314  		// observes it, at which point it is considered ready.
   315  		if pod.Namespace == loadBalanceMarkerFixture.Namespace && pod.Name == loadBalanceMarkerFixture.Name {
   316  			recorder.MarkerReceived()
   317  			allow(w)
   318  			return
   319  		}
   321  		// simulate a loaded backend
   322  		time.Sleep(2 * time.Second)
   323  		allow(w)
   324  	})
   325  }
   327  var loadBalanceMarkerFixture = &corev1.Pod{
   328  	ObjectMeta: metav1.ObjectMeta{
   329  		Namespace: "default",
   330  		Name:      "marker",
   331  	},
   332  	Spec: corev1.PodSpec{
   333  		Containers: []corev1.Container{{
   334  			Name:  "fake-name",
   335  			Image: "fakeimage",
   336  		}},
   337  	},
   338  }
   340  type connectionTrackingListener struct {
   341  	connections int64
   342  	delegate    net.Listener
   343  }
   345  func (c *connectionTrackingListener) Reset() {
   346  	atomic.StoreInt64(&c.connections, 0)
   347  }
   349  func (c *connectionTrackingListener) Accept() (net.Conn, error) {
   350  	conn, err := c.delegate.Accept()
   351  	if err == nil {
   352  		atomic.AddInt64(&c.connections, 1)
   353  	}
   354  	return conn, err
   355  }
   356  func (c *connectionTrackingListener) Close() error {
   357  	return c.delegate.Close()
   358  }
   359  func (c *connectionTrackingListener) Addr() net.Addr {
   360  	return c.delegate.Addr()
   361  }

