...

Source file src/k8s.io/kubernetes/test/integration/controlplane/kube_apiserver_test.go

Documentation: k8s.io/kubernetes/test/integration/controlplane

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package controlplane
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"net/http"
    25  	"reflect"
    26  	"strings"
    27  	"sync"
    28  	"testing"
    29  	"time"
    30  
    31  	"k8s.io/apiextensions-apiserver/test/integration/fixtures"
    32  
    33  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    34  
    35  	appsv1 "k8s.io/api/apps/v1"
    36  	corev1 "k8s.io/api/core/v1"
    37  	apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
    38  	apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    39  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    40  	"k8s.io/apimachinery/pkg/util/intstr"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	"k8s.io/client-go/kubernetes"
    43  	"k8s.io/kube-aggregator/pkg/apis/apiregistration"
    44  	"k8s.io/kube-openapi/pkg/validation/spec"
    45  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    46  	"k8s.io/kubernetes/test/integration/etcd"
    47  	"k8s.io/kubernetes/test/integration/framework"
    48  )
    49  
    50  const (
    51  	// testApiextensionsOverlapProbeString is a probe string which identifies whether
    52  	// a CRD change triggers an OpenAPI spec change
    53  	testApiextensionsOverlapProbeString = "testApiextensionsOverlapProbeField"
    54  )
    55  
    56  func TestRun(t *testing.T) {
    57  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
    58  	defer server.TearDownFn()
    59  
    60  	client, err := kubernetes.NewForConfig(server.ClientConfig)
    61  	if err != nil {
    62  		t.Fatalf("unexpected error: %v", err)
    63  	}
    64  
    65  	// test whether the server is really healthy after /healthz told us so
    66  	t.Logf("Creating Deployment directly after being healthy")
    67  	var replicas int32 = 1
    68  	_, err = client.AppsV1().Deployments("default").Create(context.TODO(), &appsv1.Deployment{
    69  		TypeMeta: metav1.TypeMeta{
    70  			Kind:       "Deployment",
    71  			APIVersion: "apps/v1",
    72  		},
    73  		ObjectMeta: metav1.ObjectMeta{
    74  			Namespace: "default",
    75  			Name:      "test",
    76  			Labels:    map[string]string{"foo": "bar"},
    77  		},
    78  		Spec: appsv1.DeploymentSpec{
    79  			Replicas: &replicas,
    80  			Strategy: appsv1.DeploymentStrategy{
    81  				Type: appsv1.RollingUpdateDeploymentStrategyType,
    82  			},
    83  			Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
    84  			Template: corev1.PodTemplateSpec{
    85  				ObjectMeta: metav1.ObjectMeta{
    86  					Labels: map[string]string{"foo": "bar"},
    87  				},
    88  				Spec: corev1.PodSpec{
    89  					Containers: []corev1.Container{
    90  						{
    91  							Name:  "foo",
    92  							Image: "foo",
    93  						},
    94  					},
    95  				},
    96  			},
    97  		},
    98  	}, metav1.CreateOptions{})
    99  	if err != nil {
   100  		t.Fatalf("Failed to create deployment: %v", err)
   101  	}
   102  }
   103  
   104  func endpointReturnsStatusOK(client *kubernetes.Clientset, path string) (bool, error) {
   105  	res := client.CoreV1().RESTClient().Get().RequestURI(path).Do(context.TODO())
   106  	var status int
   107  	res.StatusCode(&status)
   108  	_, err := res.Raw()
   109  	if err != nil {
   110  		return false, err
   111  	}
   112  	return status == http.StatusOK, nil
   113  }
   114  
   115  func TestLivezAndReadyz(t *testing.T) {
   116  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--livez-grace-period", "0s"}, framework.SharedEtcd())
   117  	defer server.TearDownFn()
   118  
   119  	client, err := kubernetes.NewForConfig(server.ClientConfig)
   120  	if err != nil {
   121  		t.Fatalf("unexpected error: %v", err)
   122  	}
   123  	if statusOK, err := endpointReturnsStatusOK(client, "/livez"); err != nil || !statusOK {
   124  		t.Fatalf("livez should be healthy, got %v and error %v", statusOK, err)
   125  	}
   126  	if statusOK, err := endpointReturnsStatusOK(client, "/readyz"); err != nil || !statusOK {
   127  		t.Fatalf("readyz should be healthy, got %v and error %v", statusOK, err)
   128  	}
   129  }
   130  
   131  // TestOpenAPIDelegationChainPlumbing is a smoke test that checks for
   132  // the existence of some representative paths from the
   133  // apiextensions-server and the kube-aggregator server, both part of
   134  // the delegation chain in kube-apiserver.
   135  func TestOpenAPIDelegationChainPlumbing(t *testing.T) {
   136  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   137  	defer server.TearDownFn()
   138  
   139  	kubeclient, err := kubernetes.NewForConfig(server.ClientConfig)
   140  	if err != nil {
   141  		t.Fatalf("unexpected error: %v", err)
   142  	}
   143  
   144  	result := kubeclient.RESTClient().Get().AbsPath("/openapi/v2").Do(context.TODO())
   145  	status := 0
   146  	result.StatusCode(&status)
   147  	if status != http.StatusOK {
   148  		t.Fatalf("GET /openapi/v2 failed: expected status=%d, got=%d", http.StatusOK, status)
   149  	}
   150  
   151  	raw, err := result.Raw()
   152  	if err != nil {
   153  		t.Fatalf("Unexpected error: %v", err)
   154  	}
   155  
   156  	type openAPISchema struct {
   157  		Paths map[string]interface{} `json:"paths"`
   158  	}
   159  
   160  	var doc openAPISchema
   161  	err = json.Unmarshal(raw, &doc)
   162  	if err != nil {
   163  		t.Fatalf("Failed to unmarshal: %v", err)
   164  	}
   165  
   166  	matchedExtension := false
   167  	extensionsPrefix := "/apis/" + apiextensionsv1beta1.GroupName
   168  
   169  	matchedRegistration := false
   170  	registrationPrefix := "/apis/" + apiregistration.GroupName
   171  
   172  	for path := range doc.Paths {
   173  		if strings.HasPrefix(path, extensionsPrefix) {
   174  			matchedExtension = true
   175  		}
   176  		if strings.HasPrefix(path, registrationPrefix) {
   177  			matchedRegistration = true
   178  		}
   179  		if matchedExtension && matchedRegistration {
   180  			return
   181  		}
   182  	}
   183  
   184  	if !matchedExtension {
   185  		t.Errorf("missing path: %q", extensionsPrefix)
   186  	}
   187  
   188  	if !matchedRegistration {
   189  		t.Errorf("missing path: %q", registrationPrefix)
   190  	}
   191  }
   192  
   193  func TestOpenAPIApiextensionsOverlapProtection(t *testing.T) {
   194  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
   195  	defer server.TearDownFn()
   196  	apiextensionsclient, err := apiextensionsclientset.NewForConfig(server.ClientConfig)
   197  	if err != nil {
   198  		t.Fatalf("unexpected error: %v", err)
   199  	}
   200  	crdPath, exist, err := getOpenAPIPath(apiextensionsclient, `/apis/apiextensions.k8s.io/v1/customresourcedefinitions/{name}`)
   201  	if err != nil {
   202  		t.Fatalf("unexpected error getting CRD OpenAPI path: %v", err)
   203  	}
   204  	if !exist {
   205  		t.Fatalf("unexpected error: apiextensions OpenAPI path doesn't exist")
   206  	}
   207  
   208  	// Create a CRD that overlaps OpenAPI path with the CRD API
   209  	crd := &apiextensionsv1.CustomResourceDefinition{
   210  		ObjectMeta: metav1.ObjectMeta{
   211  			Name:        "customresourcedefinitions.apiextensions.k8s.io",
   212  			Annotations: map[string]string{"api-approved.kubernetes.io": "unapproved, test-only"},
   213  		},
   214  		Spec: apiextensionsv1.CustomResourceDefinitionSpec{
   215  			Group: "apiextensions.k8s.io",
   216  			Scope: apiextensionsv1.ClusterScoped,
   217  			Names: apiextensionsv1.CustomResourceDefinitionNames{
   218  				Plural:   "customresourcedefinitions",
   219  				Singular: "customresourcedefinition",
   220  				Kind:     "CustomResourceDefinition",
   221  				ListKind: "CustomResourceDefinitionList",
   222  			},
   223  			Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
   224  				{
   225  					Name:    "v1",
   226  					Served:  true,
   227  					Storage: true,
   228  					Schema: &apiextensionsv1.CustomResourceValidation{
   229  						OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   230  							Type: "object",
   231  							Properties: map[string]apiextensionsv1.JSONSchemaProps{
   232  								testApiextensionsOverlapProbeString: {Type: "boolean"},
   233  							},
   234  						},
   235  					},
   236  				},
   237  			},
   238  		},
   239  	}
   240  	etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
   241  
   242  	// Create a probe CRD foo that triggers an OpenAPI spec change
   243  	if err := triggerSpecUpdateWithProbeCRD(t, apiextensionsclient, "foo"); err != nil {
   244  		t.Fatalf("unexpected error: %v", err)
   245  	}
   246  
   247  	// Expect the CRD path to not change
   248  	path, _, err := getOpenAPIPath(apiextensionsclient, `/apis/apiextensions.k8s.io/v1/customresourcedefinitions/{name}`)
   249  	if err != nil {
   250  		t.Fatalf("unexpected error: %v", err)
   251  	}
   252  	pathBytes, err := json.Marshal(path)
   253  	if err != nil {
   254  		t.Fatalf("unexpected error: %v", err)
   255  	}
   256  	crdPathBytes, err := json.Marshal(crdPath)
   257  	if err != nil {
   258  		t.Fatalf("unexpected error: %v", err)
   259  	}
   260  	if !bytes.Equal(pathBytes, crdPathBytes) {
   261  		t.Fatalf("expected CRD OpenAPI path to not change, but got different results: want %q, got %q", string(crdPathBytes), string(pathBytes))
   262  	}
   263  
   264  	// Expect the orphan definition to be pruned from the spec
   265  	exist, err = specHasProbe(apiextensionsclient, testApiextensionsOverlapProbeString)
   266  	if err != nil {
   267  		t.Fatalf("unexpected error: %v", err)
   268  	}
   269  	if exist {
   270  		t.Fatalf("unexpected error: orphan definition isn't pruned")
   271  	}
   272  
   273  	// Create a CRD that overlaps OpenAPI definition with the CRD API
   274  	crd = &apiextensionsv1.CustomResourceDefinition{
   275  		ObjectMeta: metav1.ObjectMeta{
   276  			Name:        "customresourcedefinitions.apiextensions.apis.pkg.apiextensions-apiserver.k8s.io",
   277  			Annotations: map[string]string{"api-approved.kubernetes.io": "unapproved, test-only"},
   278  		},
   279  		Spec: apiextensionsv1.CustomResourceDefinitionSpec{
   280  			Group: "apiextensions.apis.pkg.apiextensions-apiserver.k8s.io",
   281  			Scope: apiextensionsv1.ClusterScoped,
   282  			Names: apiextensionsv1.CustomResourceDefinitionNames{
   283  				Plural:   "customresourcedefinitions",
   284  				Singular: "customresourcedefinition",
   285  				Kind:     "CustomResourceDefinition",
   286  				ListKind: "CustomResourceDefinitionList",
   287  			},
   288  			Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
   289  				{
   290  					Name:    "v1",
   291  					Served:  true,
   292  					Storage: true,
   293  					Schema: &apiextensionsv1.CustomResourceValidation{
   294  						OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   295  							Type: "object",
   296  							Properties: map[string]apiextensionsv1.JSONSchemaProps{
   297  								testApiextensionsOverlapProbeString: {Type: "boolean"},
   298  							},
   299  						},
   300  					},
   301  				},
   302  			},
   303  		},
   304  	}
   305  	etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
   306  
   307  	// Create a probe CRD bar that triggers an OpenAPI spec change
   308  	if err := triggerSpecUpdateWithProbeCRD(t, apiextensionsclient, "bar"); err != nil {
   309  		t.Fatalf("unexpected error: %v", err)
   310  	}
   311  
   312  	// Expect the apiextensions definition to not change, since the overlapping definition will get renamed.
   313  	apiextensionsDefinition, exist, err := getOpenAPIDefinition(apiextensionsclient, `io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinition`)
   314  	if err != nil {
   315  		t.Fatalf("unexpected error: %v", err)
   316  	}
   317  	if !exist {
   318  		t.Fatalf("unexpected error: apiextensions definition doesn't exist")
   319  	}
   320  	bytes, err := json.Marshal(apiextensionsDefinition)
   321  	if err != nil {
   322  		t.Fatalf("unexpected error: %v", err)
   323  	}
   324  	if exist := strings.Contains(string(bytes), testApiextensionsOverlapProbeString); exist {
   325  		t.Fatalf("unexpected error: apiextensions definition gets overlapped")
   326  	}
   327  }
   328  
   329  // triggerSpecUpdateWithProbeCRD creates a probe CRD with suffix in name, and waits until
   330  // the path and definition for the probe CRD show up in the OpenAPI spec
   331  func triggerSpecUpdateWithProbeCRD(t *testing.T, apiextensionsclient *apiextensionsclientset.Clientset, suffix string) error {
   332  	// Create a probe CRD that triggers OpenAPI spec change
   333  	name := fmt.Sprintf("integration-test-%s-crd", suffix)
   334  	kind := fmt.Sprintf("Integration-test-%s-crd", suffix)
   335  	group := "probe.test.com"
   336  	crd := &apiextensionsv1.CustomResourceDefinition{
   337  		ObjectMeta: metav1.ObjectMeta{Name: name + "s." + group},
   338  		Spec: apiextensionsv1.CustomResourceDefinitionSpec{
   339  			Group: group,
   340  			Scope: apiextensionsv1.ClusterScoped,
   341  			Names: apiextensionsv1.CustomResourceDefinitionNames{
   342  				Plural:   name + "s",
   343  				Singular: name,
   344  				Kind:     kind,
   345  				ListKind: kind + "List",
   346  			},
   347  			Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
   348  				{
   349  					Name:    "v1",
   350  					Served:  true,
   351  					Storage: true,
   352  					Schema:  fixtures.AllowAllSchema(),
   353  				},
   354  			},
   355  		},
   356  	}
   357  	etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
   358  
   359  	// Expect the probe CRD path to show up in the OpenAPI spec
   360  	// TODO(roycaihw): expose response header in rest client and utilize etag here
   361  	if err := wait.Poll(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
   362  		_, exist, err := getOpenAPIPath(apiextensionsclient, fmt.Sprintf(`/apis/%s/v1/%ss/{name}`, group, name))
   363  		if err != nil {
   364  			return false, err
   365  		}
   366  		return exist, nil
   367  	}); err != nil {
   368  		return fmt.Errorf("failed to observe probe CRD path in the spec: %v", err)
   369  	}
   370  	return nil
   371  }
   372  
   373  func specHasProbe(clientset *apiextensionsclientset.Clientset, probe string) (bool, error) {
   374  	bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
   375  	if err != nil {
   376  		return false, err
   377  	}
   378  	return strings.Contains(string(bs), probe), nil
   379  }
   380  
   381  func getOpenAPIPath(clientset *apiextensionsclientset.Clientset, path string) (spec.PathItem, bool, error) {
   382  	bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
   383  	if err != nil {
   384  		return spec.PathItem{}, false, err
   385  	}
   386  	s := spec.Swagger{}
   387  	if err := json.Unmarshal(bs, &s); err != nil {
   388  		return spec.PathItem{}, false, err
   389  	}
   390  	if s.SwaggerProps.Paths == nil {
   391  		return spec.PathItem{}, false, fmt.Errorf("unexpected empty path")
   392  	}
   393  	value, ok := s.SwaggerProps.Paths.Paths[path]
   394  	return value, ok, nil
   395  }
   396  
   397  func getOpenAPIDefinition(clientset *apiextensionsclientset.Clientset, definition string) (spec.Schema, bool, error) {
   398  	bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
   399  	if err != nil {
   400  		return spec.Schema{}, false, err
   401  	}
   402  	s := spec.Swagger{}
   403  	if err := json.Unmarshal(bs, &s); err != nil {
   404  		return spec.Schema{}, false, err
   405  	}
   406  	if s.SwaggerProps.Definitions == nil {
   407  		return spec.Schema{}, false, fmt.Errorf("unexpected empty path")
   408  	}
   409  	value, ok := s.SwaggerProps.Definitions[definition]
   410  	return value, ok, nil
   411  }
   412  
   413  // return the unique endpoint IPs
   414  func getEndpointIPs(endpoints *corev1.Endpoints) []string {
   415  	endpointMap := make(map[string]bool)
   416  	ips := make([]string, 0)
   417  	for _, subset := range endpoints.Subsets {
   418  		for _, address := range subset.Addresses {
   419  			if _, ok := endpointMap[address.IP]; !ok {
   420  				endpointMap[address.IP] = true
   421  				ips = append(ips, address.IP)
   422  			}
   423  		}
   424  	}
   425  	return ips
   426  }
   427  
   428  func verifyEndpointsWithIPs(servers []*kubeapiservertesting.TestServer, ips []string) bool {
   429  	listenAddresses := make([]string, 0)
   430  	for _, server := range servers {
   431  		listenAddresses = append(listenAddresses, server.ServerOpts.GenericServerRunOptions.AdvertiseAddress.String())
   432  	}
   433  	return reflect.DeepEqual(listenAddresses, ips)
   434  }
   435  
   436  func testReconcilersAPIServerLease(t *testing.T, leaseCount int, apiServerCount int) {
   437  	var leaseServers = make([]*kubeapiservertesting.TestServer, leaseCount)
   438  	var apiServerCountServers = make([]*kubeapiservertesting.TestServer, apiServerCount)
   439  	etcd := framework.SharedEtcd()
   440  
   441  	instanceOptions := kubeapiservertesting.NewDefaultTestServerOptions()
   442  
   443  	wg := sync.WaitGroup{}
   444  	// 1. start apiServerCount api servers
   445  	for i := 0; i < apiServerCount; i++ {
   446  		// start count api server
   447  		wg.Add(1)
   448  		go func(i int) {
   449  			defer wg.Done()
   450  			server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
   451  				"--endpoint-reconciler-type", "master-count",
   452  				"--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
   453  				"--apiserver-count", fmt.Sprintf("%v", apiServerCount),
   454  			}, etcd)
   455  			apiServerCountServers[i] = server
   456  		}(i)
   457  	}
   458  	wg.Wait()
   459  
   460  	// 2. verify API Server count servers have registered
   461  	if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
   462  		client, err := kubernetes.NewForConfig(apiServerCountServers[0].ClientConfig)
   463  		if err != nil {
   464  			t.Logf("error creating client: %v", err)
   465  			return false, nil
   466  		}
   467  		endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
   468  		if err != nil {
   469  			t.Logf("error fetching endpoints: %v", err)
   470  			return false, nil
   471  		}
   472  		return verifyEndpointsWithIPs(apiServerCountServers, getEndpointIPs(endpoints)), nil
   473  	}); err != nil {
   474  		t.Fatalf("API Server count endpoints failed to register: %v", err)
   475  	}
   476  
   477  	// 3. start lease api servers
   478  	for i := 0; i < leaseCount; i++ {
   479  		wg.Add(1)
   480  		go func(i int) {
   481  			defer wg.Done()
   482  			options := []string{
   483  				"--endpoint-reconciler-type", "lease",
   484  				"--advertise-address", fmt.Sprintf("10.0.1.%v", i+10),
   485  			}
   486  			server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, options, etcd)
   487  			leaseServers[i] = server
   488  		}(i)
   489  	}
   490  	wg.Wait()
   491  	defer func() {
   492  		for i := 0; i < leaseCount; i++ {
   493  			leaseServers[i].TearDownFn()
   494  		}
   495  	}()
   496  
   497  	time.Sleep(3 * time.Second)
   498  
   499  	// 4. Shutdown the apiServerCount server
   500  	for _, server := range apiServerCountServers {
   501  		server.TearDownFn()
   502  	}
   503  
   504  	// 5. verify only leaseEndpoint servers left
   505  	if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
   506  		client, err := kubernetes.NewForConfig(leaseServers[0].ClientConfig)
   507  		if err != nil {
   508  			t.Logf("create client error: %v", err)
   509  			return false, nil
   510  		}
   511  		endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
   512  		if err != nil {
   513  			t.Logf("error fetching endpoints: %v", err)
   514  			return false, nil
   515  		}
   516  		return verifyEndpointsWithIPs(leaseServers, getEndpointIPs(endpoints)), nil
   517  	}); err != nil {
   518  		t.Fatalf("did not find only lease endpoints: %v", err)
   519  	}
   520  }
   521  
   522  func TestReconcilerAPIServerLeaseCombined(t *testing.T) {
   523  	testReconcilersAPIServerLease(t, 1, 2)
   524  }
   525  
   526  func TestReconcilerAPIServerLeaseMultiMoreAPIServers(t *testing.T) {
   527  	testReconcilersAPIServerLease(t, 2, 1)
   528  }
   529  
   530  func TestReconcilerAPIServerLeaseMultiCombined(t *testing.T) {
   531  	testReconcilersAPIServerLease(t, 2, 2)
   532  }
   533  
   534  func TestMultiAPIServerNodePortAllocation(t *testing.T) {
   535  	var kubeAPIServers []*kubeapiservertesting.TestServer
   536  	var clientAPIServers []*kubernetes.Clientset
   537  	etcd := framework.SharedEtcd()
   538  
   539  	instanceOptions := kubeapiservertesting.NewDefaultTestServerOptions()
   540  
   541  	// create 2 api servers and 2 clients
   542  	for i := 0; i < 2; i++ {
   543  		// start count api server
   544  		t.Logf("starting api server: %d", i)
   545  		server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
   546  			"--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
   547  		}, etcd)
   548  		kubeAPIServers = append(kubeAPIServers, server)
   549  
   550  		// verify kube API servers have registered and create a client
   551  		if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
   552  			client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig)
   553  			if err != nil {
   554  				t.Logf("create client error: %v", err)
   555  				return false, nil
   556  			}
   557  			clientAPIServers = append(clientAPIServers, client)
   558  			endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
   559  			if err != nil {
   560  				t.Logf("error fetching endpoints: %v", err)
   561  				return false, nil
   562  			}
   563  			return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil
   564  		}); err != nil {
   565  			t.Fatalf("did not find only lease endpoints: %v", err)
   566  		}
   567  	}
   568  
   569  	serviceObject := &corev1.Service{
   570  		ObjectMeta: metav1.ObjectMeta{
   571  			Labels: map[string]string{"foo": "bar"},
   572  			Name:   "test-node-port",
   573  		},
   574  		Spec: corev1.ServiceSpec{
   575  			Ports: []corev1.ServicePort{
   576  				{
   577  					Name:       "nodeport-test",
   578  					Port:       443,
   579  					TargetPort: intstr.IntOrString{IntVal: 443},
   580  					NodePort:   32080,
   581  					Protocol:   "TCP",
   582  				},
   583  			},
   584  			Type:     "NodePort",
   585  			Selector: map[string]string{"foo": "bar"},
   586  		},
   587  	}
   588  
   589  	// create and delete the same nodePortservice using different APIservers
   590  	// to check that API servers are using the same port allocation bitmap
   591  	for i := 0; i < 2; i++ {
   592  		// Create the service using the first API server
   593  		_, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), serviceObject, metav1.CreateOptions{})
   594  		if err != nil {
   595  			t.Fatalf("unable to create service: %v", err)
   596  		}
   597  		// Delete the service using the second API server
   598  		if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), serviceObject.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
   599  			t.Fatalf("got unexpected error: %v", err)
   600  		}
   601  	}
   602  
   603  	// shutdown the api servers
   604  	for _, server := range kubeAPIServers {
   605  		server.TearDownFn()
   606  	}
   607  
   608  }
   609  

View as plain text