1
16
17 package admissionwebhook
18
19 import (
20 "context"
21 "crypto/tls"
22 "crypto/x509"
23 "encoding/json"
24 "fmt"
25 "io"
26 "net"
27 "net/http"
28 "net/url"
29 "sync"
30 "sync/atomic"
31 "testing"
32 "time"
33
34 "k8s.io/api/admission/v1beta1"
35 admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
36 corev1 "k8s.io/api/core/v1"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/types"
39 "k8s.io/apimachinery/pkg/util/wait"
40 clientset "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/rest"
42 "k8s.io/kubernetes/cmd/kube-apiserver/app"
43 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
44 "k8s.io/kubernetes/test/integration/framework"
45 )
46
47 const (
48 testLoadBalanceClientUsername = "webhook-balance-integration-client"
49 )
50
51 type staticURLServiceResolver string
52
53 func (u staticURLServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
54 return url.Parse(string(u))
55 }
56
57
58 func TestWebhookLoadBalance(t *testing.T) {
59 roots := x509.NewCertPool()
60 if !roots.AppendCertsFromPEM(localhostCert) {
61 t.Fatal("Failed to append Cert from PEM")
62 }
63 cert, err := tls.X509KeyPair(localhostCert, localhostKey)
64 if err != nil {
65 t.Fatalf("Failed to build cert with error: %+v", err)
66 }
67
68 tests := []struct {
69 name string
70 http2 bool
71 expected int64
72 }{
73 {
74 name: "10 connections when using http1",
75 http2: false,
76 expected: 10,
77 },
78 {
79 name: "1 connections when using http2",
80 http2: true,
81 expected: 1,
82 },
83 }
84
85 for _, tc := range tests {
86 t.Run(tc.name, func(t *testing.T) {
87 localListener, err := net.Listen("tcp", "127.0.0.1:0")
88 if err != nil {
89 if localListener, err = net.Listen("tcp6", "[::1]:0"); err != nil {
90 t.Fatal(err)
91 }
92 }
93 trackingListener := &connectionTrackingListener{delegate: localListener}
94
95 recorder := &connectionRecorder{}
96 handler := newLoadBalanceWebhookHandler(recorder)
97 httpServer := &http.Server{
98 Handler: handler,
99 TLSConfig: &tls.Config{
100 RootCAs: roots,
101 Certificates: []tls.Certificate{cert},
102 },
103 }
104 go func() {
105 _ = httpServer.ServeTLS(trackingListener, "", "")
106 }()
107 defer func() {
108 _ = httpServer.Close()
109 }()
110
111 webhookURL := "https://" + localListener.Addr().String()
112 t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(webhookURL)))
113
114 s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
115 "--disable-admission-plugins=ServiceAccount",
116 }, framework.SharedEtcd())
117 defer s.TearDownFn()
118
119
120
121
122
123 clientConfig := rest.CopyConfig(s.ClientConfig)
124 clientConfig.QPS = 100
125 clientConfig.Burst = 200
126 clientConfig.Impersonate.UserName = testLoadBalanceClientUsername
127 clientConfig.Impersonate.Groups = []string{"system:masters", "system:authenticated"}
128 client, err := clientset.NewForConfig(clientConfig)
129 if err != nil {
130 t.Fatalf("unexpected error: %v", err)
131 }
132
133 _, err = client.CoreV1().Pods("default").Create(context.TODO(), loadBalanceMarkerFixture, metav1.CreateOptions{})
134 if err != nil {
135 t.Fatal(err)
136 }
137
138 upCh := recorder.Reset()
139 ns := "load-balance"
140 _, err = client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
141 if err != nil {
142 t.Fatal(err)
143 }
144
145 webhooksClientConfig := admissionregistrationv1.WebhookClientConfig{
146 CABundle: localhostCert,
147 }
148 if tc.http2 {
149 webhooksClientConfig.URL = &webhookURL
150 } else {
151 webhooksClientConfig.Service = &admissionregistrationv1.ServiceReference{
152 Namespace: "test",
153 Name: "webhook",
154 }
155 }
156 fail := admissionregistrationv1.Fail
157 mutatingCfg, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.MutatingWebhookConfiguration{
158 ObjectMeta: metav1.ObjectMeta{Name: "admission.integration.test"},
159 Webhooks: []admissionregistrationv1.MutatingWebhook{{
160 Name: "admission.integration.test",
161 ClientConfig: webhooksClientConfig,
162 Rules: []admissionregistrationv1.RuleWithOperations{{
163 Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.OperationAll},
164 Rule: admissionregistrationv1.Rule{APIGroups: []string{""}, APIVersions: []string{"v1"}, Resources: []string{"pods"}},
165 }},
166 FailurePolicy: &fail,
167 AdmissionReviewVersions: []string{"v1beta1"},
168 SideEffects: &noSideEffects,
169 }},
170 }, metav1.CreateOptions{})
171 if err != nil {
172 t.Fatal(err)
173 }
174 defer func() {
175 err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(context.TODO(), mutatingCfg.GetName(), metav1.DeleteOptions{})
176 if err != nil {
177 t.Fatal(err)
178 }
179 }()
180
181
182 if err := wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*5, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
183 _, err = client.CoreV1().Pods("default").Patch(ctx, loadBalanceMarkerFixture.Name, types.JSONPatchType, []byte("[]"), metav1.PatchOptions{})
184 select {
185 case <-upCh:
186 return true, nil
187 default:
188 t.Logf("Waiting for webhook to become effective, getting marker object: %v", err)
189 return false, nil
190 }
191 }); err != nil {
192 t.Fatal(err)
193 }
194
195 pod := func() *corev1.Pod {
196 return &corev1.Pod{
197 ObjectMeta: metav1.ObjectMeta{
198 Namespace: ns,
199 GenerateName: "loadbalance-",
200 },
201 Spec: corev1.PodSpec{
202 Containers: []corev1.Container{{
203 Name: "fake-name",
204 Image: "fakeimage",
205 }},
206 },
207 }
208 }
209
210
211 wg := &sync.WaitGroup{}
212 for i := 0; i < 10; i++ {
213 wg.Add(1)
214 go func() {
215 defer wg.Done()
216 _, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod(), metav1.CreateOptions{})
217 if err != nil {
218 t.Error(err)
219 }
220 }()
221 }
222 wg.Wait()
223
224 actual := atomic.LoadInt64(&trackingListener.connections)
225 if tc.http2 && actual != tc.expected {
226 t.Errorf("expected %d connections, got %d", tc.expected, actual)
227 }
228 if !tc.http2 && actual < tc.expected {
229 t.Errorf("expected at least %d connections, got %d", tc.expected, actual)
230 }
231 trackingListener.Reset()
232
233
234 wg = &sync.WaitGroup{}
235 for i := 0; i < 10; i++ {
236 wg.Add(1)
237 go func() {
238 defer wg.Done()
239 _, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod(), metav1.CreateOptions{})
240 if err != nil {
241 t.Error(err)
242 }
243 }()
244 }
245 wg.Wait()
246
247 if actual := atomic.LoadInt64(&trackingListener.connections); actual > 0 {
248 t.Errorf("expected no additional connections (reusing kept-alive connections), got %d", actual)
249 }
250 })
251 }
252
253 }
254
255 type connectionRecorder struct {
256 mu sync.Mutex
257 upCh chan struct{}
258 upOnce sync.Once
259 }
260
261
262
263 func (i *connectionRecorder) Reset() chan struct{} {
264 i.mu.Lock()
265 defer i.mu.Unlock()
266 i.upCh = make(chan struct{})
267 i.upOnce = sync.Once{}
268 return i.upCh
269 }
270
271 func (i *connectionRecorder) MarkerReceived() {
272 i.mu.Lock()
273 defer i.mu.Unlock()
274 i.upOnce.Do(func() {
275 close(i.upCh)
276 })
277 }
278
279 func newLoadBalanceWebhookHandler(recorder *connectionRecorder) http.Handler {
280 allow := func(w http.ResponseWriter) {
281 w.Header().Set("Content-Type", "application/json")
282 json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
283 Response: &v1beta1.AdmissionResponse{
284 Allowed: true,
285 },
286 })
287 }
288 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
289 fmt.Println(r.Proto)
290 defer r.Body.Close()
291 data, err := io.ReadAll(r.Body)
292 if err != nil {
293 http.Error(w, err.Error(), 400)
294 }
295 review := v1beta1.AdmissionReview{}
296 if err := json.Unmarshal(data, &review); err != nil {
297 http.Error(w, err.Error(), 400)
298 }
299 if review.Request.UserInfo.Username != testLoadBalanceClientUsername {
300
301 allow(w)
302 return
303 }
304
305 if len(review.Request.Object.Raw) == 0 {
306 http.Error(w, err.Error(), 400)
307 }
308 pod := &corev1.Pod{}
309 if err := json.Unmarshal(review.Request.Object.Raw, pod); err != nil {
310 http.Error(w, err.Error(), 400)
311 }
312
313
314
315 if pod.Namespace == loadBalanceMarkerFixture.Namespace && pod.Name == loadBalanceMarkerFixture.Name {
316 recorder.MarkerReceived()
317 allow(w)
318 return
319 }
320
321
322 time.Sleep(2 * time.Second)
323 allow(w)
324 })
325 }
326
327 var loadBalanceMarkerFixture = &corev1.Pod{
328 ObjectMeta: metav1.ObjectMeta{
329 Namespace: "default",
330 Name: "marker",
331 },
332 Spec: corev1.PodSpec{
333 Containers: []corev1.Container{{
334 Name: "fake-name",
335 Image: "fakeimage",
336 }},
337 },
338 }
339
340 type connectionTrackingListener struct {
341 connections int64
342 delegate net.Listener
343 }
344
345 func (c *connectionTrackingListener) Reset() {
346 atomic.StoreInt64(&c.connections, 0)
347 }
348
349 func (c *connectionTrackingListener) Accept() (net.Conn, error) {
350 conn, err := c.delegate.Accept()
351 if err == nil {
352 atomic.AddInt64(&c.connections, 1)
353 }
354 return conn, err
355 }
356 func (c *connectionTrackingListener) Close() error {
357 return c.delegate.Close()
358 }
359 func (c *connectionTrackingListener) Addr() net.Addr {
360 return c.delegate.Addr()
361 }
362
View as plain text