1
16
17 package controlplane
18
19 import (
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "io"
25 "net/http"
26 "os"
27 "path"
28 "strconv"
29 "strings"
30 "sync"
31 "testing"
32 "time"
33
34 "sigs.k8s.io/yaml"
35
36 appsv1 "k8s.io/api/apps/v1"
37 corev1 "k8s.io/api/core/v1"
38 apierrors "k8s.io/apimachinery/pkg/api/errors"
39 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40 "k8s.io/apimachinery/pkg/runtime"
41 "k8s.io/apimachinery/pkg/util/wait"
42 clientset "k8s.io/client-go/kubernetes"
43 clienttypedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
44 restclient "k8s.io/client-go/rest"
45 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
46 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
47 "k8s.io/kubernetes/test/integration"
48 "k8s.io/kubernetes/test/integration/framework"
49 "k8s.io/kubernetes/test/utils/ktesting"
50 )
51
52 const (
53
54 AliceToken string = "abc123"
55 BobToken string = "xyz987"
56 )
57
58 func testPrefix(t *testing.T, prefix string) {
59 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
60 defer server.TearDownFn()
61
62 transport, err := restclient.TransportFor(server.ClientConfig)
63 if err != nil {
64 t.Fatal(err)
65 }
66 req, err := http.NewRequest("GET", server.ClientConfig.Host+prefix, nil)
67 if err != nil {
68 t.Fatalf("couldn't create a request: %v", err)
69 }
70
71 resp, err := transport.RoundTrip(req)
72 if err != nil {
73 t.Fatalf("unexpected error getting %s prefix: %v", prefix, err)
74 }
75 defer resp.Body.Close()
76 if resp.StatusCode != http.StatusOK {
77 t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
78 }
79 }
80
81 func TestAutoscalingPrefix(t *testing.T) {
82 testPrefix(t, "/apis/autoscaling/")
83 }
84
85 func TestBatchPrefix(t *testing.T) {
86 testPrefix(t, "/apis/batch/")
87 }
88
89 func TestAppsPrefix(t *testing.T) {
90 testPrefix(t, "/apis/apps/")
91 }
92
93 func TestKubernetesService(t *testing.T) {
94 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--advertise-address=10.1.1.1"}, framework.SharedEtcd())
95 defer server.TearDownFn()
96
97 coreClient := clientset.NewForConfigOrDie(server.ClientConfig)
98 err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
99 if _, err := coreClient.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}); err != nil && apierrors.IsNotFound(err) {
100 return false, nil
101 } else if err != nil {
102 return false, err
103 }
104 return true, nil
105 })
106 if err != nil {
107 t.Fatalf("Expected kubernetes service to exist, got: %v", err)
108 }
109 }
110
111 func TestEmptyList(t *testing.T) {
112 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
113 defer server.TearDownFn()
114
115 transport, err := restclient.TransportFor(server.ClientConfig)
116 if err != nil {
117 t.Fatal(err)
118 }
119
120 u := server.ClientConfig.Host + "/api/v1/namespaces/default/pods"
121 req, err := http.NewRequest("GET", u, nil)
122 if err != nil {
123 t.Fatalf("couldn't create a request: %v", err)
124 }
125
126 resp, err := transport.RoundTrip(req)
127 if err != nil {
128 t.Fatalf("unexpected error getting response: %v", err)
129 }
130 defer resp.Body.Close()
131 if resp.StatusCode != http.StatusOK {
132 t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
133 }
134 data, _ := io.ReadAll(resp.Body)
135 decodedData := map[string]interface{}{}
136 if err := json.Unmarshal(data, &decodedData); err != nil {
137 t.Logf("body: %s", string(data))
138 t.Fatalf("got error decoding data: %v", err)
139 }
140 if items, ok := decodedData["items"]; !ok {
141 t.Logf("body: %s", string(data))
142 t.Fatalf("missing items field in empty list (all lists should return an items field)")
143 } else if items == nil {
144 t.Logf("body: %s", string(data))
145 t.Fatalf("nil items field from empty list (all lists should return non-nil empty items lists)")
146 }
147 }
148
149 func initStatusForbiddenControlPlaneConfig(options *options.ServerRunOptions) {
150 options.Authorization.Modes = []string{"AlwaysDeny"}
151 }
152
153 func initUnauthorizedControlPlaneConfig(options *options.ServerRunOptions) {
154 options.Authentication.Anonymous.Allow = false
155 }
156
157 func TestStatus(t *testing.T) {
158 testCases := []struct {
159 name string
160 modifyOptions func(*options.ServerRunOptions)
161 statusCode int
162 reqPath string
163 reason string
164 message string
165 }{
166 {
167 name: "404",
168 statusCode: http.StatusNotFound,
169 reqPath: "/apis/batch/v1/namespaces/default/jobs/foo",
170 reason: "NotFound",
171 message: `jobs.batch "foo" not found`,
172 },
173 {
174 name: "403",
175 modifyOptions: initStatusForbiddenControlPlaneConfig,
176 statusCode: http.StatusForbidden,
177 reqPath: "/apis",
178 reason: "Forbidden",
179 message: `forbidden: User "system:anonymous" cannot get path "/apis": Everything is forbidden.`,
180 },
181 {
182 name: "401",
183 modifyOptions: initUnauthorizedControlPlaneConfig,
184 statusCode: http.StatusUnauthorized,
185 reqPath: "/apis",
186 reason: "Unauthorized",
187 message: `Unauthorized`,
188 },
189 }
190
191 for _, tc := range testCases {
192 t.Run(tc.name, func(t *testing.T) {
193 tCtx := ktesting.Init(t)
194 _, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
195 ModifyServerRunOptions: func(options *options.ServerRunOptions) {
196 if tc.modifyOptions != nil {
197 tc.modifyOptions(options)
198 }
199 },
200 })
201 defer tearDownFn()
202
203
204
205 if tc.modifyOptions != nil {
206 kubeConfig.BearerToken = ""
207 }
208 transport, err := restclient.TransportFor(kubeConfig)
209 if err != nil {
210 t.Fatal(err)
211 }
212
213 req, err := http.NewRequest("GET", kubeConfig.Host+tc.reqPath, nil)
214 if err != nil {
215 t.Fatalf("unexpected error: %v", err)
216 }
217 resp, err := transport.RoundTrip(req)
218 if err != nil {
219 t.Fatalf("unexpected error: %v", err)
220 }
221 defer resp.Body.Close()
222
223 if resp.StatusCode != tc.statusCode {
224 t.Fatalf("got status %v instead of %s", resp.StatusCode, tc.name)
225 }
226 data, _ := io.ReadAll(resp.Body)
227 decodedData := map[string]interface{}{}
228 if err := json.Unmarshal(data, &decodedData); err != nil {
229 t.Logf("body: %s", string(data))
230 t.Fatalf("got error decoding data: %v", err)
231 }
232 t.Logf("body: %s", string(data))
233
234 if got, expected := decodedData["apiVersion"], "v1"; got != expected {
235 t.Errorf("unexpected apiVersion %q, expected %q", got, expected)
236 }
237 if got, expected := decodedData["kind"], "Status"; got != expected {
238 t.Errorf("unexpected kind %q, expected %q", got, expected)
239 }
240 if got, expected := decodedData["status"], "Failure"; got != expected {
241 t.Errorf("unexpected status %q, expected %q", got, expected)
242 }
243 if got, expected := decodedData["code"], float64(tc.statusCode); got != expected {
244 t.Errorf("unexpected code %v, expected %v", got, expected)
245 }
246 if got, expected := decodedData["reason"], tc.reason; got != expected {
247 t.Errorf("unexpected reason %v, expected %v", got, expected)
248 }
249 if got, expected := decodedData["message"], tc.message; got != expected {
250 t.Errorf("unexpected message %v, expected %v", got, expected)
251 }
252 })
253 }
254 }
255
256 func constructBody(val string, size int, field string, t *testing.T) *appsv1.Deployment {
257 var replicas int32 = 1
258 deploymentObject := &appsv1.Deployment{
259 TypeMeta: metav1.TypeMeta{
260 Kind: "Deployment",
261 APIVersion: "apps/v1",
262 },
263 ObjectMeta: metav1.ObjectMeta{
264 Namespace: "default",
265 Name: "test",
266 },
267 Spec: appsv1.DeploymentSpec{
268 Replicas: &replicas,
269 Selector: &metav1.LabelSelector{
270 MatchLabels: map[string]string{
271 "foo": "bar",
272 },
273 },
274 Strategy: appsv1.DeploymentStrategy{
275 Type: appsv1.RollingUpdateDeploymentStrategyType,
276 },
277 Template: corev1.PodTemplateSpec{
278 ObjectMeta: metav1.ObjectMeta{
279 Labels: map[string]string{"foo": "bar"},
280 },
281 Spec: corev1.PodSpec{
282 Containers: []corev1.Container{
283 {
284 Name: "foo",
285 Image: "foo",
286 },
287 },
288 },
289 },
290 },
291 }
292
293 switch field {
294 case "labels":
295 labelsMap := map[string]string{}
296 for i := 0; i < size; i++ {
297 key := val + strconv.Itoa(i)
298 labelsMap[key] = val
299 }
300 deploymentObject.ObjectMeta.Labels = labelsMap
301 case "annotations":
302 annotationsMap := map[string]string{}
303 for i := 0; i < size; i++ {
304 key := val + strconv.Itoa(i)
305 annotationsMap[key] = val
306 }
307 deploymentObject.ObjectMeta.Annotations = annotationsMap
308 case "finalizers":
309 finalizerString := []string{}
310 for i := 0; i < size; i++ {
311 finalizerString = append(finalizerString, val)
312 }
313 deploymentObject.ObjectMeta.Finalizers = finalizerString
314 default:
315 t.Fatalf("Unexpected field: %s used for making large deployment object value", field)
316 }
317
318 return deploymentObject
319 }
320
321 func TestObjectSizeResponses(t *testing.T) {
322 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--storage-media-type=application/json"}, framework.SharedEtcd())
323 defer server.TearDownFn()
324
325 server.ClientConfig.ContentType = runtime.ContentTypeJSON
326 client := clientset.NewForConfigOrDie(server.ClientConfig)
327
328
329
330
331
332
333
334 const DeploymentMegabyteSize = 25000
335 const DeploymentTwoMegabyteSize = 30000
336 const DeploymentThreeMegabyteSize = 45000
337
338 expectedMsgFor1MB := `etcdserver: request is too large`
339 expectedMsgFor2MB := `rpc error: code = ResourceExhausted desc = trying to send message larger than max`
340 expectedMsgFor3MB := `Request entity too large: limit is 3145728`
341 expectedMsgForLargeAnnotation := `metadata.annotations: Too long: must have at most 262144 bytes`
342
343 deployment1 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentMegabyteSize, "labels", t)
344 deployment2 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentTwoMegabyteSize, "labels", t)
345 deployment3 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentThreeMegabyteSize, "labels", t)
346
347 deployment4 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentMegabyteSize, "annotations", t)
348
349 deployment5 := constructBody("sample0123456789/sample0123456789", 2*DeploymentMegabyteSize, "finalizers", t)
350 deployment6 := constructBody("sample0123456789/sample0123456789", 2*DeploymentTwoMegabyteSize, "finalizers", t)
351 deployment7 := constructBody("sample0123456789/sample0123456789", 2*DeploymentThreeMegabyteSize, "finalizers", t)
352
353 requests := []struct {
354 size string
355 deploymentObject *appsv1.Deployment
356 expectedMessage string
357 }{
358 {"1 MB labels", deployment1, expectedMsgFor1MB},
359 {"2 MB labels", deployment2, expectedMsgFor2MB},
360 {"3 MB labels", deployment3, expectedMsgFor3MB},
361 {"1 MB annotations", deployment4, expectedMsgForLargeAnnotation},
362 {"1 MB finalizers", deployment5, expectedMsgFor1MB},
363 {"2 MB finalizers", deployment6, expectedMsgFor2MB},
364 {"3 MB finalizers", deployment7, expectedMsgFor3MB},
365 }
366
367 for _, r := range requests {
368 t.Run(r.size, func(t *testing.T) {
369 _, err := client.AppsV1().Deployments(metav1.NamespaceDefault).Create(context.TODO(), r.deploymentObject, metav1.CreateOptions{})
370 if err == nil {
371 t.Errorf("got: <nil>;want: %s", r.expectedMessage)
372 }
373 if err != nil {
374 if !strings.Contains(err.Error(), r.expectedMessage) {
375 t.Errorf("got: %s;want: %s", err.Error(), r.expectedMessage)
376 }
377 }
378 })
379 }
380 }
381
382 func TestWatchSucceedsWithoutArgs(t *testing.T) {
383 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
384 defer server.TearDownFn()
385
386 transport, err := restclient.TransportFor(server.ClientConfig)
387 if err != nil {
388 t.Fatal(err)
389 }
390
391 req, err := http.NewRequest("GET", server.ClientConfig.Host+"/api/v1/namespaces?watch=1", nil)
392 if err != nil {
393 t.Fatalf("couldn't create a request: %v", err)
394 }
395
396 resp, err := transport.RoundTrip(req)
397 if err != nil {
398 t.Fatalf("unexpected error getting response: %v", err)
399 }
400 defer resp.Body.Close()
401 if resp.StatusCode != http.StatusOK {
402 t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
403 }
404 }
405
406 var hpaV1 = `
407 {
408 "apiVersion": "autoscaling/v1",
409 "kind": "HorizontalPodAutoscaler",
410 "metadata": {
411 "name": "test-hpa",
412 "namespace": "default"
413 },
414 "spec": {
415 "scaleTargetRef": {
416 "kind": "ReplicationController",
417 "name": "test-hpa",
418 "namespace": "default"
419 },
420 "minReplicas": 1,
421 "maxReplicas": 10,
422 "targetCPUUtilizationPercentage": 50
423 }
424 }
425 `
426
427 var deploymentApps = `
428 {
429 "apiVersion": "apps/v1",
430 "kind": "Deployment",
431 "metadata": {
432 "name": "test-deployment2",
433 "namespace": "default"
434 },
435 "spec": {
436 "replicas": 1,
437 "selector": {
438 "matchLabels": {
439 "app": "nginx0"
440 }
441 },
442 "template": {
443 "metadata": {
444 "labels": {
445 "app": "nginx0"
446 }
447 },
448 "spec": {
449 "containers": [{
450 "name": "nginx",
451 "image": "registry.k8s.io/nginx:1.7.9"
452 }]
453 }
454 }
455 }
456 }
457 `
458
459 func autoscalingPath(resource, namespace, name string) string {
460 if namespace != "" {
461 namespace = path.Join("namespaces", namespace)
462 }
463 return path.Join("/apis/autoscaling/v1", namespace, resource, name)
464 }
465
466 func appsPath(resource, namespace, name string) string {
467 if namespace != "" {
468 namespace = path.Join("namespaces", namespace)
469 }
470 return path.Join("/apis/apps/v1", namespace, resource, name)
471 }
472
473 func TestAutoscalingGroupBackwardCompatibility(t *testing.T) {
474 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
475 defer server.TearDownFn()
476
477 transport, err := restclient.TransportFor(server.ClientConfig)
478 if err != nil {
479 t.Fatal(err)
480 }
481
482 requests := []struct {
483 verb string
484 URL string
485 body string
486 expectedStatusCodes map[int]bool
487 expectedVersion string
488 }{
489 {"POST", autoscalingPath("horizontalpodautoscalers", metav1.NamespaceDefault, ""), hpaV1, integration.Code201, ""},
490 {"GET", autoscalingPath("horizontalpodautoscalers", metav1.NamespaceDefault, ""), "", integration.Code200, "autoscaling/v1"},
491 }
492
493 for _, r := range requests {
494 bodyBytes := bytes.NewReader([]byte(r.body))
495 req, err := http.NewRequest(r.verb, server.ClientConfig.Host+r.URL, bodyBytes)
496 if err != nil {
497 t.Logf("case %v", r)
498 t.Fatalf("unexpected error: %v", err)
499 }
500 func() {
501 resp, err := transport.RoundTrip(req)
502 if err != nil {
503 t.Logf("case %v", r)
504 t.Fatalf("unexpected error: %v", err)
505 }
506 defer resp.Body.Close()
507 b, _ := io.ReadAll(resp.Body)
508 body := string(b)
509 if _, ok := r.expectedStatusCodes[resp.StatusCode]; !ok {
510 t.Logf("case %v", r)
511 t.Errorf("Expected status one of %v, but got %v", r.expectedStatusCodes, resp.StatusCode)
512 t.Errorf("Body: %v", body)
513 }
514 if !strings.Contains(body, "\"apiVersion\":\""+r.expectedVersion) {
515 t.Logf("case %v", r)
516 t.Errorf("Expected version %v, got body %v", r.expectedVersion, body)
517 }
518 }()
519 }
520 }
521
522 func TestAppsGroupBackwardCompatibility(t *testing.T) {
523 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
524 defer server.TearDownFn()
525
526 transport, err := restclient.TransportFor(server.ClientConfig)
527 if err != nil {
528 t.Fatal(err)
529 }
530
531 requests := []struct {
532 verb string
533 URL string
534 body string
535 expectedStatusCodes map[int]bool
536 expectedVersion string
537 }{
538
539 {"POST", appsPath("deployments", metav1.NamespaceDefault, ""), deploymentApps, integration.Code201, ""},
540 {"GET", appsPath("deployments", metav1.NamespaceDefault, "test-deployment2"), "", integration.Code200, "apps/v1"},
541
542 {"DELETE", appsPath("deployments", metav1.NamespaceDefault, "test-deployment2") + "?propagationPolicy=Orphan", "", integration.Code200, "apps/v1"},
543 }
544
545 for _, r := range requests {
546 bodyBytes := bytes.NewReader([]byte(r.body))
547 req, err := http.NewRequest(r.verb, server.ClientConfig.Host+r.URL, bodyBytes)
548 if err != nil {
549 t.Logf("case %v", r)
550 t.Fatalf("unexpected error: %v", err)
551 }
552 func() {
553 resp, err := transport.RoundTrip(req)
554 if err != nil {
555 t.Logf("case %v", r)
556 t.Fatalf("unexpected error: %v", err)
557 }
558 defer resp.Body.Close()
559 b, _ := io.ReadAll(resp.Body)
560 body := string(b)
561 if _, ok := r.expectedStatusCodes[resp.StatusCode]; !ok {
562 t.Logf("case %v", r)
563 t.Errorf("Expected status one of %v, but got %v", r.expectedStatusCodes, resp.StatusCode)
564 t.Errorf("Body: %v", body)
565 }
566 if !strings.Contains(body, "\"apiVersion\":\""+r.expectedVersion) {
567 t.Logf("case %v", r)
568 t.Errorf("Expected version %v, got body %v", r.expectedVersion, body)
569 }
570 }()
571 }
572 }
573
574 func TestAccept(t *testing.T) {
575 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
576 defer server.TearDownFn()
577
578 transport, err := restclient.TransportFor(server.ClientConfig)
579 if err != nil {
580 t.Fatal(err)
581 }
582
583 req, err := http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
584 if err != nil {
585 t.Fatal(err)
586 }
587 resp, err := transport.RoundTrip(req)
588 if err != nil {
589 t.Fatalf("unexpected error getting api: %v", err)
590 }
591 defer resp.Body.Close()
592 if resp.StatusCode != http.StatusOK {
593 t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
594 }
595 body, _ := io.ReadAll(resp.Body)
596 if resp.Header.Get("Content-Type") != "application/json" {
597 t.Errorf("unexpected content: %s", body)
598 }
599 if err := json.Unmarshal(body, &map[string]interface{}{}); err != nil {
600 t.Fatal(err)
601 }
602
603 req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
604 if err != nil {
605 t.Fatal(err)
606 }
607 req.Header.Set("Accept", "application/yaml")
608 resp, err = transport.RoundTrip(req)
609 if err != nil {
610 t.Fatal(err)
611 }
612 defer resp.Body.Close()
613 body, _ = io.ReadAll(resp.Body)
614 if resp.Header.Get("Content-Type") != "application/yaml" {
615 t.Errorf("unexpected content: %s", body)
616 }
617 t.Logf("body: %s", body)
618 if err := yaml.Unmarshal(body, &map[string]interface{}{}); err != nil {
619 t.Fatal(err)
620 }
621
622 req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
623 if err != nil {
624 t.Fatal(err)
625 }
626 req.Header.Set("Accept", "application/json, application/yaml")
627 resp, err = transport.RoundTrip(req)
628 if err != nil {
629 t.Fatal(err)
630 }
631 defer resp.Body.Close()
632 body, _ = io.ReadAll(resp.Body)
633 if resp.Header.Get("Content-Type") != "application/json" {
634 t.Errorf("unexpected content: %s", body)
635 }
636 t.Logf("body: %s", body)
637 if err := yaml.Unmarshal(body, &map[string]interface{}{}); err != nil {
638 t.Fatal(err)
639 }
640
641 req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
642 if err != nil {
643 t.Fatal(err)
644 }
645 req.Header.Set("Accept", "application")
646 resp, err = transport.RoundTrip(req)
647 if err != nil {
648 t.Fatal(err)
649 }
650 defer resp.Body.Close()
651 if resp.StatusCode != http.StatusNotAcceptable {
652 t.Errorf("unexpected error from the server")
653 }
654 }
655
656 func countEndpoints(eps *corev1.Endpoints) int {
657 count := 0
658 for i := range eps.Subsets {
659 count += len(eps.Subsets[i].Addresses) * len(eps.Subsets[i].Ports)
660 }
661 return count
662 }
663
664 func TestAPIServerService(t *testing.T) {
665 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--advertise-address=10.1.1.1"}, framework.SharedEtcd())
666 defer server.TearDownFn()
667
668 client := clientset.NewForConfigOrDie(server.ClientConfig)
669
670 err := wait.Poll(time.Second, time.Minute, func() (bool, error) {
671 svcList, err := client.CoreV1().Services(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
672 if err != nil {
673 t.Errorf("unexpected error: %v", err)
674 return false, nil
675 }
676 found := false
677 for i := range svcList.Items {
678 if svcList.Items[i].Name == "kubernetes" {
679 found = true
680 break
681 }
682 }
683 if found {
684 ep, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
685 if err != nil {
686 return false, nil
687 }
688 if countEndpoints(ep) == 0 {
689 return false, fmt.Errorf("no endpoints for kubernetes service: %v", ep)
690 }
691 return true, nil
692 }
693 return false, nil
694 })
695 if err != nil {
696 t.Errorf("unexpected error: %v", err)
697 }
698 }
699
700
701
702
703
704 func TestUpdateNodeObjects(t *testing.T) {
705 server := os.Getenv("UPDATE_NODE_APISERVER")
706 if len(server) == 0 {
707 t.Skip("UPDATE_NODE_APISERVER is not set")
708 }
709 c := clienttypedv1.NewForConfigOrDie(&restclient.Config{
710 QPS: 10000,
711 Host: server,
712 ContentConfig: restclient.ContentConfig{
713 AcceptContentTypes: "application/vnd.kubernetes.protobuf",
714 ContentType: "application/vnd.kubernetes.protobuf",
715 },
716 })
717
718 nodes := 400
719 listers := 5
720 watchers := 50
721 iterations := 10000
722
723 for i := 0; i < nodes*6; i++ {
724 c.Nodes().Delete(context.TODO(), fmt.Sprintf("node-%d", i), metav1.DeleteOptions{})
725 _, err := c.Nodes().Create(context.TODO(), &corev1.Node{
726 ObjectMeta: metav1.ObjectMeta{
727 Name: fmt.Sprintf("node-%d", i),
728 },
729 }, metav1.CreateOptions{})
730 if err != nil {
731 t.Fatal(err)
732 }
733 }
734
735 for k := 0; k < listers; k++ {
736 go func(lister int) {
737 for i := 0; i < iterations; i++ {
738 _, err := c.Nodes().List(context.TODO(), metav1.ListOptions{})
739 if err != nil {
740 fmt.Printf("[list:%d] error after %d: %v\n", lister, i, err)
741 break
742 }
743 time.Sleep(time.Duration(lister)*10*time.Millisecond + 1500*time.Millisecond)
744 }
745 }(k)
746 }
747
748 for k := 0; k < watchers; k++ {
749 go func(lister int) {
750 w, err := c.Nodes().Watch(context.TODO(), metav1.ListOptions{})
751 if err != nil {
752 fmt.Printf("[watch:%d] error: %v", lister, err)
753 return
754 }
755 i := 0
756 for r := range w.ResultChan() {
757 i++
758 if _, ok := r.Object.(*corev1.Node); !ok {
759 fmt.Printf("[watch:%d] unexpected object after %d: %#v\n", lister, i, r)
760 }
761 if i%100 == 0 {
762 fmt.Printf("[watch:%d] iteration %d ...\n", lister, i)
763 }
764 }
765 fmt.Printf("[watch:%d] done\n", lister)
766 }(k)
767 }
768
769 var wg sync.WaitGroup
770 wg.Add(nodes - listers)
771
772 for j := 0; j < nodes; j++ {
773 go func(node int) {
774 var lastCount int
775 for i := 0; i < iterations; i++ {
776 if i%100 == 0 {
777 fmt.Printf("[%d] iteration %d ...\n", node, i)
778 }
779 if i%20 == 0 {
780 _, err := c.Nodes().List(context.TODO(), metav1.ListOptions{})
781 if err != nil {
782 fmt.Printf("[%d] error after %d: %v\n", node, i, err)
783 break
784 }
785 }
786
787 r, err := c.Nodes().List(context.TODO(), metav1.ListOptions{
788 FieldSelector: fmt.Sprintf("metadata.name=node-%d", node),
789 ResourceVersion: "0",
790 })
791 if err != nil {
792 fmt.Printf("[%d] error after %d: %v\n", node, i, err)
793 break
794 }
795 if len(r.Items) != 1 {
796 fmt.Printf("[%d] error after %d: unexpected list count\n", node, i)
797 break
798 }
799
800 n, err := c.Nodes().Get(context.TODO(), fmt.Sprintf("node-%d", node), metav1.GetOptions{})
801 if err != nil {
802 fmt.Printf("[%d] error after %d: %v\n", node, i, err)
803 break
804 }
805 if len(n.Status.Conditions) != lastCount {
806 fmt.Printf("[%d] worker set %d, read %d conditions\n", node, lastCount, len(n.Status.Conditions))
807 break
808 }
809 previousCount := lastCount
810 switch {
811 case i%4 == 0:
812 lastCount = 1
813 n.Status.Conditions = []corev1.NodeCondition{
814 {
815 Type: corev1.NodeReady,
816 Status: corev1.ConditionTrue,
817 Reason: "foo",
818 },
819 }
820 case i%4 == 1:
821 lastCount = 2
822 n.Status.Conditions = []corev1.NodeCondition{
823 {
824 Type: corev1.NodeReady,
825 Status: corev1.ConditionFalse,
826 Reason: "foo",
827 },
828 {
829 Type: corev1.NodeDiskPressure,
830 Status: corev1.ConditionTrue,
831 Reason: "bar",
832 },
833 }
834 case i%4 == 2:
835 lastCount = 0
836 n.Status.Conditions = nil
837 }
838 if _, err := c.Nodes().UpdateStatus(context.TODO(), n, metav1.UpdateOptions{}); err != nil {
839 if !apierrors.IsConflict(err) {
840 fmt.Printf("[%d] error after %d: %v\n", node, i, err)
841 break
842 }
843 lastCount = previousCount
844 }
845 }
846 wg.Done()
847 fmt.Printf("[%d] done\n", node)
848 }(j)
849 }
850 wg.Wait()
851 }
852
View as plain text