...

Source file src/k8s.io/kubernetes/test/integration/apiserver/peerproxy/peer_proxy_test.go

Documentation: k8s.io/kubernetes/test/integration/apiserver/peerproxy

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package peerproxy
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"testing"
    23  	"time"
    24  
    25  	"github.com/stretchr/testify/assert"
    26  	"github.com/stretchr/testify/require"
    27  	v1 "k8s.io/api/batch/v1"
    28  	corev1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	"k8s.io/apiserver/pkg/features"
    32  	"k8s.io/apiserver/pkg/server"
    33  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    34  	"k8s.io/client-go/informers"
    35  	"k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/transport"
    37  	"k8s.io/client-go/util/cert"
    38  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    39  	"k8s.io/klog/v2"
    40  	kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    41  	"k8s.io/kubernetes/pkg/controller/storageversiongc"
    42  	"k8s.io/kubernetes/pkg/controlplane"
    43  	kubefeatures "k8s.io/kubernetes/pkg/features"
    44  
    45  	"k8s.io/kubernetes/test/integration/framework"
    46  	testutil "k8s.io/kubernetes/test/utils"
    47  	"k8s.io/kubernetes/test/utils/ktesting"
    48  )
    49  
    50  func TestPeerProxiedRequest(t *testing.T) {
    51  
    52  	ktesting.SetDefaultVerbosity(1)
    53  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    54  	t.Cleanup(cancel)
    55  
    56  	// ensure to stop cert reloading after shutdown
    57  	transport.DialerStopCh = ctx.Done()
    58  
    59  	// enable feature flags
    60  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
    61  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
    62  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()
    63  
    64  	// create sharedetcd
    65  	etcd := framework.SharedEtcd()
    66  
    67  	// create certificates for aggregation and client-cert auth
    68  	proxyCA, err := createProxyCertContent()
    69  	require.NoError(t, err)
    70  
    71  	// start test server with all APIs enabled
    72  	// override hostname to ensure unique ips
    73  	server.SetHostnameFuncForTests("test-server-a")
    74  	serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
    75  		EnableCertAuth: true,
    76  		ProxyCA:        &proxyCA},
    77  		[]string{}, etcd)
    78  	defer serverA.TearDownFn()
    79  
    80  	// start another test server with some api disabled
    81  	// override hostname to ensure unique ips
    82  	server.SetHostnameFuncForTests("test-server-b")
    83  	serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
    84  		EnableCertAuth: true,
    85  		ProxyCA:        &proxyCA},
    86  		[]string{fmt.Sprintf("--runtime-config=%s", "batch/v1=false")}, etcd)
    87  	defer serverB.TearDownFn()
    88  
    89  	kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
    90  	require.NoError(t, err)
    91  
    92  	kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
    93  	require.NoError(t, err)
    94  
    95  	// create jobs resource using serverA
    96  	job := createJobResource()
    97  	_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
    98  	require.NoError(t, err)
    99  
   100  	klog.Infof("\nServerA has created jobs\n")
   101  
   102  	// List jobs using ServerB
   103  	// This request should be proxied to ServerA since ServerB does not have batch API enabled
   104  	jobsB, err := kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
   105  	klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
   106  	require.NoError(t, err)
   107  	assert.NotEmpty(t, jobsB)
   108  	assert.Equal(t, job.Name, jobsB.Items[0].Name)
   109  }
   110  
   111  func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
   112  
   113  	ktesting.SetDefaultVerbosity(1)
   114  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
   115  	t.Cleanup(cancel)
   116  
   117  	// ensure to stop cert reloading after shutdown
   118  	transport.DialerStopCh = ctx.Done()
   119  
   120  	// enable feature flags
   121  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
   122  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
   123  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()
   124  
   125  	// create sharedetcd
   126  	etcd := framework.SharedEtcd()
   127  
   128  	// create certificates for aggregation and client-cert auth
   129  	proxyCA, err := createProxyCertContent()
   130  	require.NoError(t, err)
   131  
   132  	// set lease duration to 1s for serverA to ensure that storageversions for serverA are updated
   133  	// once it is shutdown
   134  	controlplane.IdentityLeaseDurationSeconds = 10
   135  	controlplane.IdentityLeaseGCPeriod = time.Second
   136  	controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second
   137  
   138  	// start serverA with all APIs enabled
   139  	// override hostname to ensure unique ips
   140  	server.SetHostnameFuncForTests("test-server-a")
   141  	serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
   142  	kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
   143  	require.NoError(t, err)
   144  	// ensure storageversion garbage collector ctlr is set up
   145  	informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second)
   146  	setupStorageVersionGC(ctx, kubeClientSetA, informersA)
   147  	// reset lease duration to default value for serverB and serverC since we will not be
   148  	// shutting these down
   149  	controlplane.IdentityLeaseDurationSeconds = 3600
   150  
   151  	// start serverB with some api disabled
   152  	// override hostname to ensure unique ips
   153  	server.SetHostnameFuncForTests("test-server-b")
   154  	serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{
   155  		fmt.Sprintf("--runtime-config=%v", "batch/v1=false")}, etcd)
   156  	defer serverB.TearDownFn()
   157  	kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
   158  	require.NoError(t, err)
   159  	// ensure storageversion garbage collector ctlr is set up
   160  	informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second)
   161  	setupStorageVersionGC(ctx, kubeClientSetB, informersB)
   162  
   163  	// start serverC with all APIs enabled
   164  	// override hostname to ensure unique ips
   165  	server.SetHostnameFuncForTests("test-server-c")
   166  	serverC := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
   167  	defer serverC.TearDownFn()
   168  
   169  	// create jobs resource using serverA
   170  	job := createJobResource()
   171  	_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
   172  	require.NoError(t, err)
   173  	klog.Infof("\nServerA has created jobs\n")
   174  
   175  	// shutdown serverA
   176  	serverA.TearDownFn()
   177  
   178  	var jobsB *v1.JobList
   179  	// list jobs using ServerB which it should proxy to ServerC and get back valid response
   180  	err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
   181  		jobsB, err = kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
   182  		if err != nil {
   183  			return false, nil
   184  		}
   185  		if jobsB != nil {
   186  			return true, nil
   187  		}
   188  		return false, nil
   189  	})
   190  	klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
   191  	require.NoError(t, err)
   192  	assert.NotEmpty(t, jobsB)
   193  	assert.Equal(t, job.Name, jobsB.Items[0].Name)
   194  }
   195  
   196  func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) {
   197  	leaseInformer := informers.Coordination().V1().Leases()
   198  	storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
   199  	go leaseInformer.Informer().Run(ctx.Done())
   200  	go storageVersionInformer.Informer().Run(ctx.Done())
   201  
   202  	controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer)
   203  	go controller.Run(ctx)
   204  }
   205  
   206  func createProxyCertContent() (kastesting.ProxyCA, error) {
   207  	result := kastesting.ProxyCA{}
   208  	proxySigningKey, err := testutil.NewPrivateKey()
   209  	if err != nil {
   210  		return result, err
   211  	}
   212  	proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
   213  	if err != nil {
   214  		return result, err
   215  	}
   216  
   217  	result = kastesting.ProxyCA{
   218  		ProxySigningCert: proxySigningCert,
   219  		ProxySigningKey:  proxySigningKey,
   220  	}
   221  	return result, nil
   222  }
   223  
   224  func createJobResource() *v1.Job {
   225  	return &v1.Job{
   226  		ObjectMeta: metav1.ObjectMeta{
   227  			Name:      "test-job",
   228  			Namespace: "default",
   229  		},
   230  		Spec: v1.JobSpec{
   231  			Template: corev1.PodTemplateSpec{
   232  				Spec: corev1.PodSpec{
   233  					Containers: []corev1.Container{
   234  						{
   235  							Name:  "test",
   236  							Image: "test",
   237  						},
   238  					},
   239  					RestartPolicy: corev1.RestartPolicyNever,
   240  				},
   241  			},
   242  		},
   243  	}
   244  }
   245  

View as plain text