...

Source file src/k8s.io/kubernetes/test/integration/controlplane/synthetic_controlplane_test.go

Documentation: k8s.io/kubernetes/test/integration/controlplane

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package controlplane
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"io"
    25  	"net/http"
    26  	"os"
    27  	"path"
    28  	"strconv"
    29  	"strings"
    30  	"sync"
    31  	"testing"
    32  	"time"
    33  
    34  	"sigs.k8s.io/yaml"
    35  
    36  	appsv1 "k8s.io/api/apps/v1"
    37  	corev1 "k8s.io/api/core/v1"
    38  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    39  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    40  	"k8s.io/apimachinery/pkg/runtime"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	clientset "k8s.io/client-go/kubernetes"
    43  	clienttypedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
    44  	restclient "k8s.io/client-go/rest"
    45  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    46  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    47  	"k8s.io/kubernetes/test/integration"
    48  	"k8s.io/kubernetes/test/integration/framework"
    49  	"k8s.io/kubernetes/test/utils/ktesting"
    50  )
    51  
    52  const (
    53  	// Fake values for testing.
    54  	AliceToken string = "abc123" // username: alice.  Present in token file.
    55  	BobToken   string = "xyz987" // username: bob.  Present in token file.
    56  )
    57  
    58  func testPrefix(t *testing.T, prefix string) {
    59  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
    60  	defer server.TearDownFn()
    61  
    62  	transport, err := restclient.TransportFor(server.ClientConfig)
    63  	if err != nil {
    64  		t.Fatal(err)
    65  	}
    66  	req, err := http.NewRequest("GET", server.ClientConfig.Host+prefix, nil)
    67  	if err != nil {
    68  		t.Fatalf("couldn't create a request: %v", err)
    69  	}
    70  
    71  	resp, err := transport.RoundTrip(req)
    72  	if err != nil {
    73  		t.Fatalf("unexpected error getting %s prefix: %v", prefix, err)
    74  	}
    75  	defer resp.Body.Close()
    76  	if resp.StatusCode != http.StatusOK {
    77  		t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
    78  	}
    79  }
    80  
    81  func TestAutoscalingPrefix(t *testing.T) {
    82  	testPrefix(t, "/apis/autoscaling/")
    83  }
    84  
    85  func TestBatchPrefix(t *testing.T) {
    86  	testPrefix(t, "/apis/batch/")
    87  }
    88  
    89  func TestAppsPrefix(t *testing.T) {
    90  	testPrefix(t, "/apis/apps/")
    91  }
    92  
    93  func TestKubernetesService(t *testing.T) {
    94  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--advertise-address=10.1.1.1"}, framework.SharedEtcd())
    95  	defer server.TearDownFn()
    96  
    97  	coreClient := clientset.NewForConfigOrDie(server.ClientConfig)
    98  	err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
    99  		if _, err := coreClient.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}); err != nil && apierrors.IsNotFound(err) {
   100  			return false, nil
   101  		} else if err != nil {
   102  			return false, err
   103  		}
   104  		return true, nil
   105  	})
   106  	if err != nil {
   107  		t.Fatalf("Expected kubernetes service to exist, got: %v", err)
   108  	}
   109  }
   110  
   111  func TestEmptyList(t *testing.T) {
   112  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   113  	defer server.TearDownFn()
   114  
   115  	transport, err := restclient.TransportFor(server.ClientConfig)
   116  	if err != nil {
   117  		t.Fatal(err)
   118  	}
   119  
   120  	u := server.ClientConfig.Host + "/api/v1/namespaces/default/pods"
   121  	req, err := http.NewRequest("GET", u, nil)
   122  	if err != nil {
   123  		t.Fatalf("couldn't create a request: %v", err)
   124  	}
   125  
   126  	resp, err := transport.RoundTrip(req)
   127  	if err != nil {
   128  		t.Fatalf("unexpected error getting response: %v", err)
   129  	}
   130  	defer resp.Body.Close()
   131  	if resp.StatusCode != http.StatusOK {
   132  		t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
   133  	}
   134  	data, _ := io.ReadAll(resp.Body)
   135  	decodedData := map[string]interface{}{}
   136  	if err := json.Unmarshal(data, &decodedData); err != nil {
   137  		t.Logf("body: %s", string(data))
   138  		t.Fatalf("got error decoding data: %v", err)
   139  	}
   140  	if items, ok := decodedData["items"]; !ok {
   141  		t.Logf("body: %s", string(data))
   142  		t.Fatalf("missing items field in empty list (all lists should return an items field)")
   143  	} else if items == nil {
   144  		t.Logf("body: %s", string(data))
   145  		t.Fatalf("nil items field from empty list (all lists should return non-nil empty items lists)")
   146  	}
   147  }
   148  
   149  func initStatusForbiddenControlPlaneConfig(options *options.ServerRunOptions) {
   150  	options.Authorization.Modes = []string{"AlwaysDeny"}
   151  }
   152  
   153  func initUnauthorizedControlPlaneConfig(options *options.ServerRunOptions) {
   154  	options.Authentication.Anonymous.Allow = false
   155  }
   156  
   157  func TestStatus(t *testing.T) {
   158  	testCases := []struct {
   159  		name          string
   160  		modifyOptions func(*options.ServerRunOptions)
   161  		statusCode    int
   162  		reqPath       string
   163  		reason        string
   164  		message       string
   165  	}{
   166  		{
   167  			name:       "404",
   168  			statusCode: http.StatusNotFound,
   169  			reqPath:    "/apis/batch/v1/namespaces/default/jobs/foo",
   170  			reason:     "NotFound",
   171  			message:    `jobs.batch "foo" not found`,
   172  		},
   173  		{
   174  			name:          "403",
   175  			modifyOptions: initStatusForbiddenControlPlaneConfig,
   176  			statusCode:    http.StatusForbidden,
   177  			reqPath:       "/apis",
   178  			reason:        "Forbidden",
   179  			message:       `forbidden: User "system:anonymous" cannot get path "/apis": Everything is forbidden.`,
   180  		},
   181  		{
   182  			name:          "401",
   183  			modifyOptions: initUnauthorizedControlPlaneConfig,
   184  			statusCode:    http.StatusUnauthorized,
   185  			reqPath:       "/apis",
   186  			reason:        "Unauthorized",
   187  			message:       `Unauthorized`,
   188  		},
   189  	}
   190  
   191  	for _, tc := range testCases {
   192  		t.Run(tc.name, func(t *testing.T) {
   193  			tCtx := ktesting.Init(t)
   194  			_, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
   195  				ModifyServerRunOptions: func(options *options.ServerRunOptions) {
   196  					if tc.modifyOptions != nil {
   197  						tc.modifyOptions(options)
   198  					}
   199  				},
   200  			})
   201  			defer tearDownFn()
   202  
   203  			// When modifying authenticator and authorizer, don't use
   204  			// bearer token than will be always authorized.
   205  			if tc.modifyOptions != nil {
   206  				kubeConfig.BearerToken = ""
   207  			}
   208  			transport, err := restclient.TransportFor(kubeConfig)
   209  			if err != nil {
   210  				t.Fatal(err)
   211  			}
   212  
   213  			req, err := http.NewRequest("GET", kubeConfig.Host+tc.reqPath, nil)
   214  			if err != nil {
   215  				t.Fatalf("unexpected error: %v", err)
   216  			}
   217  			resp, err := transport.RoundTrip(req)
   218  			if err != nil {
   219  				t.Fatalf("unexpected error: %v", err)
   220  			}
   221  			defer resp.Body.Close()
   222  
   223  			if resp.StatusCode != tc.statusCode {
   224  				t.Fatalf("got status %v instead of %s", resp.StatusCode, tc.name)
   225  			}
   226  			data, _ := io.ReadAll(resp.Body)
   227  			decodedData := map[string]interface{}{}
   228  			if err := json.Unmarshal(data, &decodedData); err != nil {
   229  				t.Logf("body: %s", string(data))
   230  				t.Fatalf("got error decoding data: %v", err)
   231  			}
   232  			t.Logf("body: %s", string(data))
   233  
   234  			if got, expected := decodedData["apiVersion"], "v1"; got != expected {
   235  				t.Errorf("unexpected apiVersion %q, expected %q", got, expected)
   236  			}
   237  			if got, expected := decodedData["kind"], "Status"; got != expected {
   238  				t.Errorf("unexpected kind %q, expected %q", got, expected)
   239  			}
   240  			if got, expected := decodedData["status"], "Failure"; got != expected {
   241  				t.Errorf("unexpected status %q, expected %q", got, expected)
   242  			}
   243  			if got, expected := decodedData["code"], float64(tc.statusCode); got != expected {
   244  				t.Errorf("unexpected code %v, expected %v", got, expected)
   245  			}
   246  			if got, expected := decodedData["reason"], tc.reason; got != expected {
   247  				t.Errorf("unexpected reason %v, expected %v", got, expected)
   248  			}
   249  			if got, expected := decodedData["message"], tc.message; got != expected {
   250  				t.Errorf("unexpected message %v, expected %v", got, expected)
   251  			}
   252  		})
   253  	}
   254  }
   255  
   256  func constructBody(val string, size int, field string, t *testing.T) *appsv1.Deployment {
   257  	var replicas int32 = 1
   258  	deploymentObject := &appsv1.Deployment{
   259  		TypeMeta: metav1.TypeMeta{
   260  			Kind:       "Deployment",
   261  			APIVersion: "apps/v1",
   262  		},
   263  		ObjectMeta: metav1.ObjectMeta{
   264  			Namespace: "default",
   265  			Name:      "test",
   266  		},
   267  		Spec: appsv1.DeploymentSpec{
   268  			Replicas: &replicas,
   269  			Selector: &metav1.LabelSelector{
   270  				MatchLabels: map[string]string{
   271  					"foo": "bar",
   272  				},
   273  			},
   274  			Strategy: appsv1.DeploymentStrategy{
   275  				Type: appsv1.RollingUpdateDeploymentStrategyType,
   276  			},
   277  			Template: corev1.PodTemplateSpec{
   278  				ObjectMeta: metav1.ObjectMeta{
   279  					Labels: map[string]string{"foo": "bar"},
   280  				},
   281  				Spec: corev1.PodSpec{
   282  					Containers: []corev1.Container{
   283  						{
   284  							Name:  "foo",
   285  							Image: "foo",
   286  						},
   287  					},
   288  				},
   289  			},
   290  		},
   291  	}
   292  
   293  	switch field {
   294  	case "labels":
   295  		labelsMap := map[string]string{}
   296  		for i := 0; i < size; i++ {
   297  			key := val + strconv.Itoa(i)
   298  			labelsMap[key] = val
   299  		}
   300  		deploymentObject.ObjectMeta.Labels = labelsMap
   301  	case "annotations":
   302  		annotationsMap := map[string]string{}
   303  		for i := 0; i < size; i++ {
   304  			key := val + strconv.Itoa(i)
   305  			annotationsMap[key] = val
   306  		}
   307  		deploymentObject.ObjectMeta.Annotations = annotationsMap
   308  	case "finalizers":
   309  		finalizerString := []string{}
   310  		for i := 0; i < size; i++ {
   311  			finalizerString = append(finalizerString, val)
   312  		}
   313  		deploymentObject.ObjectMeta.Finalizers = finalizerString
   314  	default:
   315  		t.Fatalf("Unexpected field: %s used for making large deployment object value", field)
   316  	}
   317  
   318  	return deploymentObject
   319  }
   320  
   321  func TestObjectSizeResponses(t *testing.T) {
   322  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--storage-media-type=application/json"}, framework.SharedEtcd())
   323  	defer server.TearDownFn()
   324  
   325  	server.ClientConfig.ContentType = runtime.ContentTypeJSON
   326  	client := clientset.NewForConfigOrDie(server.ClientConfig)
   327  
   328  	// Computing ManagedFields is extremely inefficient for large object, e.g.
   329  	// it may take 10s+ to just compute it if we have ~100k very small labels or
   330  	// annotations.  This in turn may lead to timing out requests,
   331  	// which have hardcoded timeout of 34 seconds.
   332  	// As a result, we're using slightly larger individual labels/annotations
   333  	// to reduce the number of those.
   334  	const DeploymentMegabyteSize = 25000
   335  	const DeploymentTwoMegabyteSize = 30000
   336  	const DeploymentThreeMegabyteSize = 45000
   337  
   338  	expectedMsgFor1MB := `etcdserver: request is too large`
   339  	expectedMsgFor2MB := `rpc error: code = ResourceExhausted desc = trying to send message larger than max`
   340  	expectedMsgFor3MB := `Request entity too large: limit is 3145728`
   341  	expectedMsgForLargeAnnotation := `metadata.annotations: Too long: must have at most 262144 bytes`
   342  
   343  	deployment1 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentMegabyteSize, "labels", t)      // >1.5 MB file
   344  	deployment2 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentTwoMegabyteSize, "labels", t)   // >2 MB file
   345  	deployment3 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentThreeMegabyteSize, "labels", t) // >3 MB file
   346  
   347  	deployment4 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentMegabyteSize, "annotations", t)
   348  
   349  	deployment5 := constructBody("sample0123456789/sample0123456789", 2*DeploymentMegabyteSize, "finalizers", t)      // >1.5 MB file
   350  	deployment6 := constructBody("sample0123456789/sample0123456789", 2*DeploymentTwoMegabyteSize, "finalizers", t)   // >2 MB file
   351  	deployment7 := constructBody("sample0123456789/sample0123456789", 2*DeploymentThreeMegabyteSize, "finalizers", t) // >3 MB file
   352  
   353  	requests := []struct {
   354  		size             string
   355  		deploymentObject *appsv1.Deployment
   356  		expectedMessage  string
   357  	}{
   358  		{"1 MB labels", deployment1, expectedMsgFor1MB},
   359  		{"2 MB labels", deployment2, expectedMsgFor2MB},
   360  		{"3 MB labels", deployment3, expectedMsgFor3MB},
   361  		{"1 MB annotations", deployment4, expectedMsgForLargeAnnotation},
   362  		{"1 MB finalizers", deployment5, expectedMsgFor1MB},
   363  		{"2 MB finalizers", deployment6, expectedMsgFor2MB},
   364  		{"3 MB finalizers", deployment7, expectedMsgFor3MB},
   365  	}
   366  
   367  	for _, r := range requests {
   368  		t.Run(r.size, func(t *testing.T) {
   369  			_, err := client.AppsV1().Deployments(metav1.NamespaceDefault).Create(context.TODO(), r.deploymentObject, metav1.CreateOptions{})
   370  			if err == nil {
   371  				t.Errorf("got: <nil>;want: %s", r.expectedMessage)
   372  			}
   373  			if err != nil {
   374  				if !strings.Contains(err.Error(), r.expectedMessage) {
   375  					t.Errorf("got: %s;want: %s", err.Error(), r.expectedMessage)
   376  				}
   377  			}
   378  		})
   379  	}
   380  }
   381  
   382  func TestWatchSucceedsWithoutArgs(t *testing.T) {
   383  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   384  	defer server.TearDownFn()
   385  
   386  	transport, err := restclient.TransportFor(server.ClientConfig)
   387  	if err != nil {
   388  		t.Fatal(err)
   389  	}
   390  
   391  	req, err := http.NewRequest("GET", server.ClientConfig.Host+"/api/v1/namespaces?watch=1", nil)
   392  	if err != nil {
   393  		t.Fatalf("couldn't create a request: %v", err)
   394  	}
   395  
   396  	resp, err := transport.RoundTrip(req)
   397  	if err != nil {
   398  		t.Fatalf("unexpected error getting response: %v", err)
   399  	}
   400  	defer resp.Body.Close()
   401  	if resp.StatusCode != http.StatusOK {
   402  		t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
   403  	}
   404  }
   405  
   406  var hpaV1 = `
   407  {
   408    "apiVersion": "autoscaling/v1",
   409    "kind": "HorizontalPodAutoscaler",
   410    "metadata": {
   411      "name": "test-hpa",
   412      "namespace": "default"
   413    },
   414    "spec": {
   415      "scaleTargetRef": {
   416        "kind": "ReplicationController",
   417        "name": "test-hpa",
   418        "namespace": "default"
   419      },
   420      "minReplicas": 1,
   421      "maxReplicas": 10,
   422      "targetCPUUtilizationPercentage": 50
   423    }
   424  }
   425  `
   426  
   427  var deploymentApps = `
   428  {
   429    "apiVersion": "apps/v1",
   430    "kind": "Deployment",
   431    "metadata": {
   432       "name": "test-deployment2",
   433       "namespace": "default"
   434    },
   435    "spec": {
   436      "replicas": 1,
   437      "selector": {
   438        "matchLabels": {
   439          "app": "nginx0"
   440        }
   441      },
   442      "template": {
   443        "metadata": {
   444          "labels": {
   445            "app": "nginx0"
   446          }
   447        },
   448        "spec": {
   449          "containers": [{
   450            "name": "nginx",
   451            "image": "registry.k8s.io/nginx:1.7.9"
   452          }]
   453        }
   454      }
   455    }
   456  }
   457  `
   458  
   459  func autoscalingPath(resource, namespace, name string) string {
   460  	if namespace != "" {
   461  		namespace = path.Join("namespaces", namespace)
   462  	}
   463  	return path.Join("/apis/autoscaling/v1", namespace, resource, name)
   464  }
   465  
   466  func appsPath(resource, namespace, name string) string {
   467  	if namespace != "" {
   468  		namespace = path.Join("namespaces", namespace)
   469  	}
   470  	return path.Join("/apis/apps/v1", namespace, resource, name)
   471  }
   472  
   473  func TestAutoscalingGroupBackwardCompatibility(t *testing.T) {
   474  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   475  	defer server.TearDownFn()
   476  
   477  	transport, err := restclient.TransportFor(server.ClientConfig)
   478  	if err != nil {
   479  		t.Fatal(err)
   480  	}
   481  
   482  	requests := []struct {
   483  		verb                string
   484  		URL                 string
   485  		body                string
   486  		expectedStatusCodes map[int]bool
   487  		expectedVersion     string
   488  	}{
   489  		{"POST", autoscalingPath("horizontalpodautoscalers", metav1.NamespaceDefault, ""), hpaV1, integration.Code201, ""},
   490  		{"GET", autoscalingPath("horizontalpodautoscalers", metav1.NamespaceDefault, ""), "", integration.Code200, "autoscaling/v1"},
   491  	}
   492  
   493  	for _, r := range requests {
   494  		bodyBytes := bytes.NewReader([]byte(r.body))
   495  		req, err := http.NewRequest(r.verb, server.ClientConfig.Host+r.URL, bodyBytes)
   496  		if err != nil {
   497  			t.Logf("case %v", r)
   498  			t.Fatalf("unexpected error: %v", err)
   499  		}
   500  		func() {
   501  			resp, err := transport.RoundTrip(req)
   502  			if err != nil {
   503  				t.Logf("case %v", r)
   504  				t.Fatalf("unexpected error: %v", err)
   505  			}
   506  			defer resp.Body.Close()
   507  			b, _ := io.ReadAll(resp.Body)
   508  			body := string(b)
   509  			if _, ok := r.expectedStatusCodes[resp.StatusCode]; !ok {
   510  				t.Logf("case %v", r)
   511  				t.Errorf("Expected status one of %v, but got %v", r.expectedStatusCodes, resp.StatusCode)
   512  				t.Errorf("Body: %v", body)
   513  			}
   514  			if !strings.Contains(body, "\"apiVersion\":\""+r.expectedVersion) {
   515  				t.Logf("case %v", r)
   516  				t.Errorf("Expected version %v, got body %v", r.expectedVersion, body)
   517  			}
   518  		}()
   519  	}
   520  }
   521  
   522  func TestAppsGroupBackwardCompatibility(t *testing.T) {
   523  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   524  	defer server.TearDownFn()
   525  
   526  	transport, err := restclient.TransportFor(server.ClientConfig)
   527  	if err != nil {
   528  		t.Fatal(err)
   529  	}
   530  
   531  	requests := []struct {
   532  		verb                string
   533  		URL                 string
   534  		body                string
   535  		expectedStatusCodes map[int]bool
   536  		expectedVersion     string
   537  	}{
   538  		// Post to apps endpoint and get back from apps
   539  		{"POST", appsPath("deployments", metav1.NamespaceDefault, ""), deploymentApps, integration.Code201, ""},
   540  		{"GET", appsPath("deployments", metav1.NamespaceDefault, "test-deployment2"), "", integration.Code200, "apps/v1"},
   541  		// set propagationPolicy=Orphan to force the object to be returned so we can check the apiVersion (otherwise, we just get a status object back)
   542  		{"DELETE", appsPath("deployments", metav1.NamespaceDefault, "test-deployment2") + "?propagationPolicy=Orphan", "", integration.Code200, "apps/v1"},
   543  	}
   544  
   545  	for _, r := range requests {
   546  		bodyBytes := bytes.NewReader([]byte(r.body))
   547  		req, err := http.NewRequest(r.verb, server.ClientConfig.Host+r.URL, bodyBytes)
   548  		if err != nil {
   549  			t.Logf("case %v", r)
   550  			t.Fatalf("unexpected error: %v", err)
   551  		}
   552  		func() {
   553  			resp, err := transport.RoundTrip(req)
   554  			if err != nil {
   555  				t.Logf("case %v", r)
   556  				t.Fatalf("unexpected error: %v", err)
   557  			}
   558  			defer resp.Body.Close()
   559  			b, _ := io.ReadAll(resp.Body)
   560  			body := string(b)
   561  			if _, ok := r.expectedStatusCodes[resp.StatusCode]; !ok {
   562  				t.Logf("case %v", r)
   563  				t.Errorf("Expected status one of %v, but got %v", r.expectedStatusCodes, resp.StatusCode)
   564  				t.Errorf("Body: %v", body)
   565  			}
   566  			if !strings.Contains(body, "\"apiVersion\":\""+r.expectedVersion) {
   567  				t.Logf("case %v", r)
   568  				t.Errorf("Expected version %v, got body %v", r.expectedVersion, body)
   569  			}
   570  		}()
   571  	}
   572  }
   573  
   574  func TestAccept(t *testing.T) {
   575  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   576  	defer server.TearDownFn()
   577  
   578  	transport, err := restclient.TransportFor(server.ClientConfig)
   579  	if err != nil {
   580  		t.Fatal(err)
   581  	}
   582  
   583  	req, err := http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
   584  	if err != nil {
   585  		t.Fatal(err)
   586  	}
   587  	resp, err := transport.RoundTrip(req)
   588  	if err != nil {
   589  		t.Fatalf("unexpected error getting api: %v", err)
   590  	}
   591  	defer resp.Body.Close()
   592  	if resp.StatusCode != http.StatusOK {
   593  		t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
   594  	}
   595  	body, _ := io.ReadAll(resp.Body)
   596  	if resp.Header.Get("Content-Type") != "application/json" {
   597  		t.Errorf("unexpected content: %s", body)
   598  	}
   599  	if err := json.Unmarshal(body, &map[string]interface{}{}); err != nil {
   600  		t.Fatal(err)
   601  	}
   602  
   603  	req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
   604  	if err != nil {
   605  		t.Fatal(err)
   606  	}
   607  	req.Header.Set("Accept", "application/yaml")
   608  	resp, err = transport.RoundTrip(req)
   609  	if err != nil {
   610  		t.Fatal(err)
   611  	}
   612  	defer resp.Body.Close()
   613  	body, _ = io.ReadAll(resp.Body)
   614  	if resp.Header.Get("Content-Type") != "application/yaml" {
   615  		t.Errorf("unexpected content: %s", body)
   616  	}
   617  	t.Logf("body: %s", body)
   618  	if err := yaml.Unmarshal(body, &map[string]interface{}{}); err != nil {
   619  		t.Fatal(err)
   620  	}
   621  
   622  	req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
   623  	if err != nil {
   624  		t.Fatal(err)
   625  	}
   626  	req.Header.Set("Accept", "application/json, application/yaml")
   627  	resp, err = transport.RoundTrip(req)
   628  	if err != nil {
   629  		t.Fatal(err)
   630  	}
   631  	defer resp.Body.Close()
   632  	body, _ = io.ReadAll(resp.Body)
   633  	if resp.Header.Get("Content-Type") != "application/json" {
   634  		t.Errorf("unexpected content: %s", body)
   635  	}
   636  	t.Logf("body: %s", body)
   637  	if err := yaml.Unmarshal(body, &map[string]interface{}{}); err != nil {
   638  		t.Fatal(err)
   639  	}
   640  
   641  	req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
   642  	if err != nil {
   643  		t.Fatal(err)
   644  	}
   645  	req.Header.Set("Accept", "application") // not a valid media type
   646  	resp, err = transport.RoundTrip(req)
   647  	if err != nil {
   648  		t.Fatal(err)
   649  	}
   650  	defer resp.Body.Close()
   651  	if resp.StatusCode != http.StatusNotAcceptable {
   652  		t.Errorf("unexpected error from the server")
   653  	}
   654  }
   655  
   656  func countEndpoints(eps *corev1.Endpoints) int {
   657  	count := 0
   658  	for i := range eps.Subsets {
   659  		count += len(eps.Subsets[i].Addresses) * len(eps.Subsets[i].Ports)
   660  	}
   661  	return count
   662  }
   663  
   664  func TestAPIServerService(t *testing.T) {
   665  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--advertise-address=10.1.1.1"}, framework.SharedEtcd())
   666  	defer server.TearDownFn()
   667  
   668  	client := clientset.NewForConfigOrDie(server.ClientConfig)
   669  
   670  	err := wait.Poll(time.Second, time.Minute, func() (bool, error) {
   671  		svcList, err := client.CoreV1().Services(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
   672  		if err != nil {
   673  			t.Errorf("unexpected error: %v", err)
   674  			return false, nil
   675  		}
   676  		found := false
   677  		for i := range svcList.Items {
   678  			if svcList.Items[i].Name == "kubernetes" {
   679  				found = true
   680  				break
   681  			}
   682  		}
   683  		if found {
   684  			ep, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
   685  			if err != nil {
   686  				return false, nil
   687  			}
   688  			if countEndpoints(ep) == 0 {
   689  				return false, fmt.Errorf("no endpoints for kubernetes service: %v", ep)
   690  			}
   691  			return true, nil
   692  		}
   693  		return false, nil
   694  	})
   695  	if err != nil {
   696  		t.Errorf("unexpected error: %v", err)
   697  	}
   698  }
   699  
   700  // TestUpdateNodeObjects represents a simple version of the behavior of node checkins at steady
   701  // state. This test allows for easy profiling of a realistic primary scenario for baseline CPU
   702  // in very large clusters. It is disabled by default - start a kube-apiserver and pass
   703  // UPDATE_NODE_APISERVER as the host value.
   704  func TestUpdateNodeObjects(t *testing.T) {
   705  	server := os.Getenv("UPDATE_NODE_APISERVER")
   706  	if len(server) == 0 {
   707  		t.Skip("UPDATE_NODE_APISERVER is not set")
   708  	}
   709  	c := clienttypedv1.NewForConfigOrDie(&restclient.Config{
   710  		QPS:  10000,
   711  		Host: server,
   712  		ContentConfig: restclient.ContentConfig{
   713  			AcceptContentTypes: "application/vnd.kubernetes.protobuf",
   714  			ContentType:        "application/vnd.kubernetes.protobuf",
   715  		},
   716  	})
   717  
   718  	nodes := 400
   719  	listers := 5
   720  	watchers := 50
   721  	iterations := 10000
   722  
   723  	for i := 0; i < nodes*6; i++ {
   724  		c.Nodes().Delete(context.TODO(), fmt.Sprintf("node-%d", i), metav1.DeleteOptions{})
   725  		_, err := c.Nodes().Create(context.TODO(), &corev1.Node{
   726  			ObjectMeta: metav1.ObjectMeta{
   727  				Name: fmt.Sprintf("node-%d", i),
   728  			},
   729  		}, metav1.CreateOptions{})
   730  		if err != nil {
   731  			t.Fatal(err)
   732  		}
   733  	}
   734  
   735  	for k := 0; k < listers; k++ {
   736  		go func(lister int) {
   737  			for i := 0; i < iterations; i++ {
   738  				_, err := c.Nodes().List(context.TODO(), metav1.ListOptions{})
   739  				if err != nil {
   740  					fmt.Printf("[list:%d] error after %d: %v\n", lister, i, err)
   741  					break
   742  				}
   743  				time.Sleep(time.Duration(lister)*10*time.Millisecond + 1500*time.Millisecond)
   744  			}
   745  		}(k)
   746  	}
   747  
   748  	for k := 0; k < watchers; k++ {
   749  		go func(lister int) {
   750  			w, err := c.Nodes().Watch(context.TODO(), metav1.ListOptions{})
   751  			if err != nil {
   752  				fmt.Printf("[watch:%d] error: %v", lister, err)
   753  				return
   754  			}
   755  			i := 0
   756  			for r := range w.ResultChan() {
   757  				i++
   758  				if _, ok := r.Object.(*corev1.Node); !ok {
   759  					fmt.Printf("[watch:%d] unexpected object after %d: %#v\n", lister, i, r)
   760  				}
   761  				if i%100 == 0 {
   762  					fmt.Printf("[watch:%d] iteration %d ...\n", lister, i)
   763  				}
   764  			}
   765  			fmt.Printf("[watch:%d] done\n", lister)
   766  		}(k)
   767  	}
   768  
   769  	var wg sync.WaitGroup
   770  	wg.Add(nodes - listers)
   771  
   772  	for j := 0; j < nodes; j++ {
   773  		go func(node int) {
   774  			var lastCount int
   775  			for i := 0; i < iterations; i++ {
   776  				if i%100 == 0 {
   777  					fmt.Printf("[%d] iteration %d ...\n", node, i)
   778  				}
   779  				if i%20 == 0 {
   780  					_, err := c.Nodes().List(context.TODO(), metav1.ListOptions{})
   781  					if err != nil {
   782  						fmt.Printf("[%d] error after %d: %v\n", node, i, err)
   783  						break
   784  					}
   785  				}
   786  
   787  				r, err := c.Nodes().List(context.TODO(), metav1.ListOptions{
   788  					FieldSelector:   fmt.Sprintf("metadata.name=node-%d", node),
   789  					ResourceVersion: "0",
   790  				})
   791  				if err != nil {
   792  					fmt.Printf("[%d] error after %d: %v\n", node, i, err)
   793  					break
   794  				}
   795  				if len(r.Items) != 1 {
   796  					fmt.Printf("[%d] error after %d: unexpected list count\n", node, i)
   797  					break
   798  				}
   799  
   800  				n, err := c.Nodes().Get(context.TODO(), fmt.Sprintf("node-%d", node), metav1.GetOptions{})
   801  				if err != nil {
   802  					fmt.Printf("[%d] error after %d: %v\n", node, i, err)
   803  					break
   804  				}
   805  				if len(n.Status.Conditions) != lastCount {
   806  					fmt.Printf("[%d] worker set %d, read %d conditions\n", node, lastCount, len(n.Status.Conditions))
   807  					break
   808  				}
   809  				previousCount := lastCount
   810  				switch {
   811  				case i%4 == 0:
   812  					lastCount = 1
   813  					n.Status.Conditions = []corev1.NodeCondition{
   814  						{
   815  							Type:   corev1.NodeReady,
   816  							Status: corev1.ConditionTrue,
   817  							Reason: "foo",
   818  						},
   819  					}
   820  				case i%4 == 1:
   821  					lastCount = 2
   822  					n.Status.Conditions = []corev1.NodeCondition{
   823  						{
   824  							Type:   corev1.NodeReady,
   825  							Status: corev1.ConditionFalse,
   826  							Reason: "foo",
   827  						},
   828  						{
   829  							Type:   corev1.NodeDiskPressure,
   830  							Status: corev1.ConditionTrue,
   831  							Reason: "bar",
   832  						},
   833  					}
   834  				case i%4 == 2:
   835  					lastCount = 0
   836  					n.Status.Conditions = nil
   837  				}
   838  				if _, err := c.Nodes().UpdateStatus(context.TODO(), n, metav1.UpdateOptions{}); err != nil {
   839  					if !apierrors.IsConflict(err) {
   840  						fmt.Printf("[%d] error after %d: %v\n", node, i, err)
   841  						break
   842  					}
   843  					lastCount = previousCount
   844  				}
   845  			}
   846  			wg.Done()
   847  			fmt.Printf("[%d] done\n", node)
   848  		}(j)
   849  	}
   850  	wg.Wait()
   851  }
   852  

View as plain text