1
16
17 package controlplane
18
19 import (
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "net/http"
25 "reflect"
26 "strings"
27 "sync"
28 "testing"
29 "time"
30
31 "k8s.io/apiextensions-apiserver/test/integration/fixtures"
32
33 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
34
35 appsv1 "k8s.io/api/apps/v1"
36 corev1 "k8s.io/api/core/v1"
37 apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
38 apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
39 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40 "k8s.io/apimachinery/pkg/util/intstr"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/client-go/kubernetes"
43 "k8s.io/kube-aggregator/pkg/apis/apiregistration"
44 "k8s.io/kube-openapi/pkg/validation/spec"
45 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
46 "k8s.io/kubernetes/test/integration/etcd"
47 "k8s.io/kubernetes/test/integration/framework"
48 )
49
50 const (
51
52
53 testApiextensionsOverlapProbeString = "testApiextensionsOverlapProbeField"
54 )
55
56 func TestRun(t *testing.T) {
57 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
58 defer server.TearDownFn()
59
60 client, err := kubernetes.NewForConfig(server.ClientConfig)
61 if err != nil {
62 t.Fatalf("unexpected error: %v", err)
63 }
64
65
66 t.Logf("Creating Deployment directly after being healthy")
67 var replicas int32 = 1
68 _, err = client.AppsV1().Deployments("default").Create(context.TODO(), &appsv1.Deployment{
69 TypeMeta: metav1.TypeMeta{
70 Kind: "Deployment",
71 APIVersion: "apps/v1",
72 },
73 ObjectMeta: metav1.ObjectMeta{
74 Namespace: "default",
75 Name: "test",
76 Labels: map[string]string{"foo": "bar"},
77 },
78 Spec: appsv1.DeploymentSpec{
79 Replicas: &replicas,
80 Strategy: appsv1.DeploymentStrategy{
81 Type: appsv1.RollingUpdateDeploymentStrategyType,
82 },
83 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
84 Template: corev1.PodTemplateSpec{
85 ObjectMeta: metav1.ObjectMeta{
86 Labels: map[string]string{"foo": "bar"},
87 },
88 Spec: corev1.PodSpec{
89 Containers: []corev1.Container{
90 {
91 Name: "foo",
92 Image: "foo",
93 },
94 },
95 },
96 },
97 },
98 }, metav1.CreateOptions{})
99 if err != nil {
100 t.Fatalf("Failed to create deployment: %v", err)
101 }
102 }
103
104 func endpointReturnsStatusOK(client *kubernetes.Clientset, path string) (bool, error) {
105 res := client.CoreV1().RESTClient().Get().RequestURI(path).Do(context.TODO())
106 var status int
107 res.StatusCode(&status)
108 _, err := res.Raw()
109 if err != nil {
110 return false, err
111 }
112 return status == http.StatusOK, nil
113 }
114
115 func TestLivezAndReadyz(t *testing.T) {
116 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--livez-grace-period", "0s"}, framework.SharedEtcd())
117 defer server.TearDownFn()
118
119 client, err := kubernetes.NewForConfig(server.ClientConfig)
120 if err != nil {
121 t.Fatalf("unexpected error: %v", err)
122 }
123 if statusOK, err := endpointReturnsStatusOK(client, "/livez"); err != nil || !statusOK {
124 t.Fatalf("livez should be healthy, got %v and error %v", statusOK, err)
125 }
126 if statusOK, err := endpointReturnsStatusOK(client, "/readyz"); err != nil || !statusOK {
127 t.Fatalf("readyz should be healthy, got %v and error %v", statusOK, err)
128 }
129 }
130
131
132
133
134
135 func TestOpenAPIDelegationChainPlumbing(t *testing.T) {
136 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
137 defer server.TearDownFn()
138
139 kubeclient, err := kubernetes.NewForConfig(server.ClientConfig)
140 if err != nil {
141 t.Fatalf("unexpected error: %v", err)
142 }
143
144 result := kubeclient.RESTClient().Get().AbsPath("/openapi/v2").Do(context.TODO())
145 status := 0
146 result.StatusCode(&status)
147 if status != http.StatusOK {
148 t.Fatalf("GET /openapi/v2 failed: expected status=%d, got=%d", http.StatusOK, status)
149 }
150
151 raw, err := result.Raw()
152 if err != nil {
153 t.Fatalf("Unexpected error: %v", err)
154 }
155
156 type openAPISchema struct {
157 Paths map[string]interface{} `json:"paths"`
158 }
159
160 var doc openAPISchema
161 err = json.Unmarshal(raw, &doc)
162 if err != nil {
163 t.Fatalf("Failed to unmarshal: %v", err)
164 }
165
166 matchedExtension := false
167 extensionsPrefix := "/apis/" + apiextensionsv1beta1.GroupName
168
169 matchedRegistration := false
170 registrationPrefix := "/apis/" + apiregistration.GroupName
171
172 for path := range doc.Paths {
173 if strings.HasPrefix(path, extensionsPrefix) {
174 matchedExtension = true
175 }
176 if strings.HasPrefix(path, registrationPrefix) {
177 matchedRegistration = true
178 }
179 if matchedExtension && matchedRegistration {
180 return
181 }
182 }
183
184 if !matchedExtension {
185 t.Errorf("missing path: %q", extensionsPrefix)
186 }
187
188 if !matchedRegistration {
189 t.Errorf("missing path: %q", registrationPrefix)
190 }
191 }
192
193 func TestOpenAPIApiextensionsOverlapProtection(t *testing.T) {
194 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
195 defer server.TearDownFn()
196 apiextensionsclient, err := apiextensionsclientset.NewForConfig(server.ClientConfig)
197 if err != nil {
198 t.Fatalf("unexpected error: %v", err)
199 }
200 crdPath, exist, err := getOpenAPIPath(apiextensionsclient, `/apis/apiextensions.k8s.io/v1/customresourcedefinitions/{name}`)
201 if err != nil {
202 t.Fatalf("unexpected error getting CRD OpenAPI path: %v", err)
203 }
204 if !exist {
205 t.Fatalf("unexpected error: apiextensions OpenAPI path doesn't exist")
206 }
207
208
209 crd := &apiextensionsv1.CustomResourceDefinition{
210 ObjectMeta: metav1.ObjectMeta{
211 Name: "customresourcedefinitions.apiextensions.k8s.io",
212 Annotations: map[string]string{"api-approved.kubernetes.io": "unapproved, test-only"},
213 },
214 Spec: apiextensionsv1.CustomResourceDefinitionSpec{
215 Group: "apiextensions.k8s.io",
216 Scope: apiextensionsv1.ClusterScoped,
217 Names: apiextensionsv1.CustomResourceDefinitionNames{
218 Plural: "customresourcedefinitions",
219 Singular: "customresourcedefinition",
220 Kind: "CustomResourceDefinition",
221 ListKind: "CustomResourceDefinitionList",
222 },
223 Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
224 {
225 Name: "v1",
226 Served: true,
227 Storage: true,
228 Schema: &apiextensionsv1.CustomResourceValidation{
229 OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
230 Type: "object",
231 Properties: map[string]apiextensionsv1.JSONSchemaProps{
232 testApiextensionsOverlapProbeString: {Type: "boolean"},
233 },
234 },
235 },
236 },
237 },
238 },
239 }
240 etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
241
242
243 if err := triggerSpecUpdateWithProbeCRD(t, apiextensionsclient, "foo"); err != nil {
244 t.Fatalf("unexpected error: %v", err)
245 }
246
247
248 path, _, err := getOpenAPIPath(apiextensionsclient, `/apis/apiextensions.k8s.io/v1/customresourcedefinitions/{name}`)
249 if err != nil {
250 t.Fatalf("unexpected error: %v", err)
251 }
252 pathBytes, err := json.Marshal(path)
253 if err != nil {
254 t.Fatalf("unexpected error: %v", err)
255 }
256 crdPathBytes, err := json.Marshal(crdPath)
257 if err != nil {
258 t.Fatalf("unexpected error: %v", err)
259 }
260 if !bytes.Equal(pathBytes, crdPathBytes) {
261 t.Fatalf("expected CRD OpenAPI path to not change, but got different results: want %q, got %q", string(crdPathBytes), string(pathBytes))
262 }
263
264
265 exist, err = specHasProbe(apiextensionsclient, testApiextensionsOverlapProbeString)
266 if err != nil {
267 t.Fatalf("unexpected error: %v", err)
268 }
269 if exist {
270 t.Fatalf("unexpected error: orphan definition isn't pruned")
271 }
272
273
274 crd = &apiextensionsv1.CustomResourceDefinition{
275 ObjectMeta: metav1.ObjectMeta{
276 Name: "customresourcedefinitions.apiextensions.apis.pkg.apiextensions-apiserver.k8s.io",
277 Annotations: map[string]string{"api-approved.kubernetes.io": "unapproved, test-only"},
278 },
279 Spec: apiextensionsv1.CustomResourceDefinitionSpec{
280 Group: "apiextensions.apis.pkg.apiextensions-apiserver.k8s.io",
281 Scope: apiextensionsv1.ClusterScoped,
282 Names: apiextensionsv1.CustomResourceDefinitionNames{
283 Plural: "customresourcedefinitions",
284 Singular: "customresourcedefinition",
285 Kind: "CustomResourceDefinition",
286 ListKind: "CustomResourceDefinitionList",
287 },
288 Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
289 {
290 Name: "v1",
291 Served: true,
292 Storage: true,
293 Schema: &apiextensionsv1.CustomResourceValidation{
294 OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
295 Type: "object",
296 Properties: map[string]apiextensionsv1.JSONSchemaProps{
297 testApiextensionsOverlapProbeString: {Type: "boolean"},
298 },
299 },
300 },
301 },
302 },
303 },
304 }
305 etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
306
307
308 if err := triggerSpecUpdateWithProbeCRD(t, apiextensionsclient, "bar"); err != nil {
309 t.Fatalf("unexpected error: %v", err)
310 }
311
312
313 apiextensionsDefinition, exist, err := getOpenAPIDefinition(apiextensionsclient, `io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinition`)
314 if err != nil {
315 t.Fatalf("unexpected error: %v", err)
316 }
317 if !exist {
318 t.Fatalf("unexpected error: apiextensions definition doesn't exist")
319 }
320 bytes, err := json.Marshal(apiextensionsDefinition)
321 if err != nil {
322 t.Fatalf("unexpected error: %v", err)
323 }
324 if exist := strings.Contains(string(bytes), testApiextensionsOverlapProbeString); exist {
325 t.Fatalf("unexpected error: apiextensions definition gets overlapped")
326 }
327 }
328
329
330
331 func triggerSpecUpdateWithProbeCRD(t *testing.T, apiextensionsclient *apiextensionsclientset.Clientset, suffix string) error {
332
333 name := fmt.Sprintf("integration-test-%s-crd", suffix)
334 kind := fmt.Sprintf("Integration-test-%s-crd", suffix)
335 group := "probe.test.com"
336 crd := &apiextensionsv1.CustomResourceDefinition{
337 ObjectMeta: metav1.ObjectMeta{Name: name + "s." + group},
338 Spec: apiextensionsv1.CustomResourceDefinitionSpec{
339 Group: group,
340 Scope: apiextensionsv1.ClusterScoped,
341 Names: apiextensionsv1.CustomResourceDefinitionNames{
342 Plural: name + "s",
343 Singular: name,
344 Kind: kind,
345 ListKind: kind + "List",
346 },
347 Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
348 {
349 Name: "v1",
350 Served: true,
351 Storage: true,
352 Schema: fixtures.AllowAllSchema(),
353 },
354 },
355 },
356 }
357 etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
358
359
360
361 if err := wait.Poll(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
362 _, exist, err := getOpenAPIPath(apiextensionsclient, fmt.Sprintf(`/apis/%s/v1/%ss/{name}`, group, name))
363 if err != nil {
364 return false, err
365 }
366 return exist, nil
367 }); err != nil {
368 return fmt.Errorf("failed to observe probe CRD path in the spec: %v", err)
369 }
370 return nil
371 }
372
373 func specHasProbe(clientset *apiextensionsclientset.Clientset, probe string) (bool, error) {
374 bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
375 if err != nil {
376 return false, err
377 }
378 return strings.Contains(string(bs), probe), nil
379 }
380
381 func getOpenAPIPath(clientset *apiextensionsclientset.Clientset, path string) (spec.PathItem, bool, error) {
382 bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
383 if err != nil {
384 return spec.PathItem{}, false, err
385 }
386 s := spec.Swagger{}
387 if err := json.Unmarshal(bs, &s); err != nil {
388 return spec.PathItem{}, false, err
389 }
390 if s.SwaggerProps.Paths == nil {
391 return spec.PathItem{}, false, fmt.Errorf("unexpected empty path")
392 }
393 value, ok := s.SwaggerProps.Paths.Paths[path]
394 return value, ok, nil
395 }
396
397 func getOpenAPIDefinition(clientset *apiextensionsclientset.Clientset, definition string) (spec.Schema, bool, error) {
398 bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
399 if err != nil {
400 return spec.Schema{}, false, err
401 }
402 s := spec.Swagger{}
403 if err := json.Unmarshal(bs, &s); err != nil {
404 return spec.Schema{}, false, err
405 }
406 if s.SwaggerProps.Definitions == nil {
407 return spec.Schema{}, false, fmt.Errorf("unexpected empty path")
408 }
409 value, ok := s.SwaggerProps.Definitions[definition]
410 return value, ok, nil
411 }
412
413
414 func getEndpointIPs(endpoints *corev1.Endpoints) []string {
415 endpointMap := make(map[string]bool)
416 ips := make([]string, 0)
417 for _, subset := range endpoints.Subsets {
418 for _, address := range subset.Addresses {
419 if _, ok := endpointMap[address.IP]; !ok {
420 endpointMap[address.IP] = true
421 ips = append(ips, address.IP)
422 }
423 }
424 }
425 return ips
426 }
427
428 func verifyEndpointsWithIPs(servers []*kubeapiservertesting.TestServer, ips []string) bool {
429 listenAddresses := make([]string, 0)
430 for _, server := range servers {
431 listenAddresses = append(listenAddresses, server.ServerOpts.GenericServerRunOptions.AdvertiseAddress.String())
432 }
433 return reflect.DeepEqual(listenAddresses, ips)
434 }
435
436 func testReconcilersAPIServerLease(t *testing.T, leaseCount int, apiServerCount int) {
437 var leaseServers = make([]*kubeapiservertesting.TestServer, leaseCount)
438 var apiServerCountServers = make([]*kubeapiservertesting.TestServer, apiServerCount)
439 etcd := framework.SharedEtcd()
440
441 instanceOptions := kubeapiservertesting.NewDefaultTestServerOptions()
442
443 wg := sync.WaitGroup{}
444
445 for i := 0; i < apiServerCount; i++ {
446
447 wg.Add(1)
448 go func(i int) {
449 defer wg.Done()
450 server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
451 "--endpoint-reconciler-type", "master-count",
452 "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
453 "--apiserver-count", fmt.Sprintf("%v", apiServerCount),
454 }, etcd)
455 apiServerCountServers[i] = server
456 }(i)
457 }
458 wg.Wait()
459
460
461 if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
462 client, err := kubernetes.NewForConfig(apiServerCountServers[0].ClientConfig)
463 if err != nil {
464 t.Logf("error creating client: %v", err)
465 return false, nil
466 }
467 endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
468 if err != nil {
469 t.Logf("error fetching endpoints: %v", err)
470 return false, nil
471 }
472 return verifyEndpointsWithIPs(apiServerCountServers, getEndpointIPs(endpoints)), nil
473 }); err != nil {
474 t.Fatalf("API Server count endpoints failed to register: %v", err)
475 }
476
477
478 for i := 0; i < leaseCount; i++ {
479 wg.Add(1)
480 go func(i int) {
481 defer wg.Done()
482 options := []string{
483 "--endpoint-reconciler-type", "lease",
484 "--advertise-address", fmt.Sprintf("10.0.1.%v", i+10),
485 }
486 server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, options, etcd)
487 leaseServers[i] = server
488 }(i)
489 }
490 wg.Wait()
491 defer func() {
492 for i := 0; i < leaseCount; i++ {
493 leaseServers[i].TearDownFn()
494 }
495 }()
496
497 time.Sleep(3 * time.Second)
498
499
500 for _, server := range apiServerCountServers {
501 server.TearDownFn()
502 }
503
504
505 if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
506 client, err := kubernetes.NewForConfig(leaseServers[0].ClientConfig)
507 if err != nil {
508 t.Logf("create client error: %v", err)
509 return false, nil
510 }
511 endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
512 if err != nil {
513 t.Logf("error fetching endpoints: %v", err)
514 return false, nil
515 }
516 return verifyEndpointsWithIPs(leaseServers, getEndpointIPs(endpoints)), nil
517 }); err != nil {
518 t.Fatalf("did not find only lease endpoints: %v", err)
519 }
520 }
521
522 func TestReconcilerAPIServerLeaseCombined(t *testing.T) {
523 testReconcilersAPIServerLease(t, 1, 2)
524 }
525
526 func TestReconcilerAPIServerLeaseMultiMoreAPIServers(t *testing.T) {
527 testReconcilersAPIServerLease(t, 2, 1)
528 }
529
530 func TestReconcilerAPIServerLeaseMultiCombined(t *testing.T) {
531 testReconcilersAPIServerLease(t, 2, 2)
532 }
533
534 func TestMultiAPIServerNodePortAllocation(t *testing.T) {
535 var kubeAPIServers []*kubeapiservertesting.TestServer
536 var clientAPIServers []*kubernetes.Clientset
537 etcd := framework.SharedEtcd()
538
539 instanceOptions := kubeapiservertesting.NewDefaultTestServerOptions()
540
541
542 for i := 0; i < 2; i++ {
543
544 t.Logf("starting api server: %d", i)
545 server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
546 "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
547 }, etcd)
548 kubeAPIServers = append(kubeAPIServers, server)
549
550
551 if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
552 client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig)
553 if err != nil {
554 t.Logf("create client error: %v", err)
555 return false, nil
556 }
557 clientAPIServers = append(clientAPIServers, client)
558 endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
559 if err != nil {
560 t.Logf("error fetching endpoints: %v", err)
561 return false, nil
562 }
563 return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil
564 }); err != nil {
565 t.Fatalf("did not find only lease endpoints: %v", err)
566 }
567 }
568
569 serviceObject := &corev1.Service{
570 ObjectMeta: metav1.ObjectMeta{
571 Labels: map[string]string{"foo": "bar"},
572 Name: "test-node-port",
573 },
574 Spec: corev1.ServiceSpec{
575 Ports: []corev1.ServicePort{
576 {
577 Name: "nodeport-test",
578 Port: 443,
579 TargetPort: intstr.IntOrString{IntVal: 443},
580 NodePort: 32080,
581 Protocol: "TCP",
582 },
583 },
584 Type: "NodePort",
585 Selector: map[string]string{"foo": "bar"},
586 },
587 }
588
589
590
591 for i := 0; i < 2; i++ {
592
593 _, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), serviceObject, metav1.CreateOptions{})
594 if err != nil {
595 t.Fatalf("unable to create service: %v", err)
596 }
597
598 if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), serviceObject.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
599 t.Fatalf("got unexpected error: %v", err)
600 }
601 }
602
603
604 for _, server := range kubeAPIServers {
605 server.TearDownFn()
606 }
607
608 }
609
View as plain text