...

Source file src/k8s.io/kubernetes/cmd/kube-apiserver/app/testing/testserver.go

Documentation: k8s.io/kubernetes/cmd/kube-apiserver/app/testing

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package testing
    18  
    19  import (
    20  	"context"
    21  	"crypto/ecdsa"
    22  	"crypto/elliptic"
    23  	"crypto/rand"
    24  	"crypto/rsa"
    25  	"crypto/x509"
    26  	"crypto/x509/pkix"
    27  	"encoding/pem"
    28  	"fmt"
    29  	"math"
    30  	"math/big"
    31  	"net"
    32  	"os"
    33  	"path/filepath"
    34  	"runtime"
    35  	"time"
    36  
    37  	"github.com/spf13/pflag"
    38  	"go.etcd.io/etcd/client/pkg/v3/transport"
    39  	clientv3 "go.etcd.io/etcd/client/v3"
    40  	"google.golang.org/grpc"
    41  
    42  	"k8s.io/apimachinery/pkg/api/errors"
    43  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    44  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    45  	"k8s.io/apimachinery/pkg/util/wait"
    46  	serveroptions "k8s.io/apiserver/pkg/server/options"
    47  	"k8s.io/apiserver/pkg/storage/storagebackend"
    48  	"k8s.io/apiserver/pkg/storageversion"
    49  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    50  	"k8s.io/client-go/kubernetes"
    51  	restclient "k8s.io/client-go/rest"
    52  	clientgotransport "k8s.io/client-go/transport"
    53  	"k8s.io/client-go/util/cert"
    54  	"k8s.io/client-go/util/keyutil"
    55  	logsapi "k8s.io/component-base/logs/api/v1"
    56  	"k8s.io/klog/v2"
    57  	"k8s.io/kube-aggregator/pkg/apiserver"
    58  	"k8s.io/kubernetes/pkg/features"
    59  
    60  	"k8s.io/kubernetes/cmd/kube-apiserver/app"
    61  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    62  	testutil "k8s.io/kubernetes/test/utils"
    63  )
    64  
    65  func init() {
    66  	// If instantiated more than once or together with other servers, the
    67  	// servers would try to modify the global logging state. This must get
    68  	// ignored during testing.
    69  	logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
    70  }
    71  
    72  // This key is for testing purposes only and is not considered secure.
    73  const ecdsaPrivateKey = `-----BEGIN EC PRIVATE KEY-----
    74  MHcCAQEEIEZmTmUhuanLjPA2CLquXivuwBDHTt5XYwgIr/kA1LtRoAoGCCqGSM49
    75  AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
    76  /IR3qCXyThP/dbCiHrF3v1cuhBOHY8CLVg==
    77  -----END EC PRIVATE KEY-----`
    78  
    79  // TearDownFunc is to be called to tear down a test server.
    80  type TearDownFunc func()
    81  
    82  // TestServerInstanceOptions Instance options the TestServer
    83  type TestServerInstanceOptions struct {
    84  	// SkipHealthzCheck returns without waiting for the server to become healthy.
    85  	// Useful for testing server configurations expected to prevent /healthz from completing.
    86  	SkipHealthzCheck bool
    87  	// Enable cert-auth for the kube-apiserver
    88  	EnableCertAuth bool
    89  	// Wrap the storage version interface of the created server's generic server.
    90  	StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager
    91  	// CA file used for requestheader authn during communication between:
    92  	// 1. kube-apiserver and peer when the local apiserver is not able to serve the request due
    93  	// to version skew
    94  	// 2. kube-apiserver and aggregated apiserver
    95  
    96  	// We specify this as on option to pass a common proxyCA to multiple apiservers to simulate
    97  	// an apiserver version skew scenario where all apiservers use the same proxyCA to verify client connections.
    98  	ProxyCA *ProxyCA
    99  }
   100  
   101  // TestServer return values supplied by kube-test-ApiServer
   102  type TestServer struct {
   103  	ClientConfig      *restclient.Config        // Rest client config
   104  	ServerOpts        *options.ServerRunOptions // ServerOpts
   105  	TearDownFn        TearDownFunc              // TearDown function
   106  	TmpDir            string                    // Temp Dir used, by the apiserver
   107  	EtcdClient        *clientv3.Client          // used by tests that need to check data migrated from APIs that are no longer served
   108  	EtcdStoragePrefix string                    // storage prefix in etcd
   109  }
   110  
   111  // Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie
   112  type Logger interface {
   113  	Helper()
   114  	Errorf(format string, args ...interface{})
   115  	Fatalf(format string, args ...interface{})
   116  	Logf(format string, args ...interface{})
   117  	Cleanup(func())
   118  }
   119  
   120  // ProxyCA contains the certificate authority certificate and key which is used to verify client connections
   121  // to kube-apiservers. The clients can be :
   122  // 1. aggregated apiservers
   123  // 2. peer kube-apiservers
   124  type ProxyCA struct {
   125  	ProxySigningCert *x509.Certificate
   126  	ProxySigningKey  *rsa.PrivateKey
   127  }
   128  
   129  // NewDefaultTestServerOptions Default options for TestServer instances
   130  func NewDefaultTestServerOptions() *TestServerInstanceOptions {
   131  	return &TestServerInstanceOptions{
   132  		EnableCertAuth: true,
   133  	}
   134  }
   135  
   136  // StartTestServer starts a etcd server and kube-apiserver. A rest client config and a tear-down func,
   137  // and location of the tmpdir are returned.
   138  //
   139  // Note: we return a tear-down func instead of a stop channel because the later will leak temporary
   140  // files that because Golang testing's call to os.Exit will not give a stop channel go routine
   141  // enough time to remove temporary files.
   142  func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
   143  	if instanceOptions == nil {
   144  		instanceOptions = NewDefaultTestServerOptions()
   145  	}
   146  
   147  	result.TmpDir, err = os.MkdirTemp("", "kubernetes-kube-apiserver")
   148  	if err != nil {
   149  		return result, fmt.Errorf("failed to create temp dir: %v", err)
   150  	}
   151  
   152  	stopCh := make(chan struct{})
   153  	var errCh chan error
   154  	tearDown := func() {
   155  		// Closing stopCh is stopping apiserver and cleaning up
   156  		// after itself, including shutting down its storage layer.
   157  		close(stopCh)
   158  
   159  		// If the apiserver was started, let's wait for it to
   160  		// shutdown clearly.
   161  		if errCh != nil {
   162  			err, ok := <-errCh
   163  			if ok && err != nil {
   164  				klog.Errorf("Failed to shutdown test server clearly: %v", err)
   165  			}
   166  		}
   167  		os.RemoveAll(result.TmpDir)
   168  	}
   169  	defer func() {
   170  		if result.TearDownFn == nil {
   171  			tearDown()
   172  		}
   173  	}()
   174  
   175  	fs := pflag.NewFlagSet("test", pflag.PanicOnError)
   176  
   177  	s := options.NewServerRunOptions()
   178  	for _, f := range s.Flags().FlagSets {
   179  		fs.AddFlagSet(f)
   180  	}
   181  
   182  	s.SecureServing.Listener, s.SecureServing.BindPort, err = createLocalhostListenerOnFreePort()
   183  	if err != nil {
   184  		return result, fmt.Errorf("failed to create listener: %v", err)
   185  	}
   186  	s.SecureServing.ServerCert.CertDirectory = result.TmpDir
   187  
   188  	if instanceOptions.EnableCertAuth {
   189  		// set up default headers for request header auth
   190  		reqHeaders := serveroptions.NewDelegatingAuthenticationOptions()
   191  		s.Authentication.RequestHeader = &reqHeaders.RequestHeader
   192  
   193  		var proxySigningKey *rsa.PrivateKey
   194  		var proxySigningCert *x509.Certificate
   195  
   196  		if instanceOptions.ProxyCA != nil {
   197  			// use provided proxyCA
   198  			proxySigningKey = instanceOptions.ProxyCA.ProxySigningKey
   199  			proxySigningCert = instanceOptions.ProxyCA.ProxySigningCert
   200  
   201  		} else {
   202  			// create certificates for aggregation and client-cert auth
   203  			proxySigningKey, err = testutil.NewPrivateKey()
   204  			if err != nil {
   205  				return result, err
   206  			}
   207  			proxySigningCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
   208  			if err != nil {
   209  				return result, err
   210  			}
   211  		}
   212  		proxyCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt")
   213  		if err := os.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil {
   214  			return result, err
   215  		}
   216  		s.Authentication.RequestHeader.ClientCAFile = proxyCACertFile
   217  
   218  		// give the kube api server an "identity" it can use to for request header auth
   219  		// so that aggregated api servers can understand who the calling user is
   220  		s.Authentication.RequestHeader.AllowedNames = []string{"ash", "misty", "brock"}
   221  
   222  		// create private key
   223  		signer, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
   224  		if err != nil {
   225  			return result, err
   226  		}
   227  
   228  		// make a client certificate for the api server - common name has to match one of our defined names above
   229  		serial, err := rand.Int(rand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
   230  		if err != nil {
   231  			return result, err
   232  		}
   233  		serial = new(big.Int).Add(serial, big.NewInt(1))
   234  		tenThousandHoursLater := time.Now().Add(10_000 * time.Hour)
   235  		certTmpl := x509.Certificate{
   236  			Subject: pkix.Name{
   237  				CommonName: "misty",
   238  			},
   239  			SerialNumber: serial,
   240  			NotBefore:    proxySigningCert.NotBefore,
   241  			NotAfter:     tenThousandHoursLater,
   242  			KeyUsage:     x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
   243  			ExtKeyUsage: []x509.ExtKeyUsage{
   244  				x509.ExtKeyUsageClientAuth,
   245  			},
   246  			BasicConstraintsValid: true,
   247  		}
   248  		certDERBytes, err := x509.CreateCertificate(rand.Reader, &certTmpl, proxySigningCert, signer.Public(), proxySigningKey)
   249  		if err != nil {
   250  			return result, err
   251  		}
   252  		clientCrtOfAPIServer, err := x509.ParseCertificate(certDERBytes)
   253  		if err != nil {
   254  			return result, err
   255  		}
   256  
   257  		// write the cert to disk
   258  		certificatePath := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "misty-crt.crt")
   259  		certBlock := pem.Block{
   260  			Type:  "CERTIFICATE",
   261  			Bytes: clientCrtOfAPIServer.Raw,
   262  		}
   263  		certBytes := pem.EncodeToMemory(&certBlock)
   264  		if err := cert.WriteCert(certificatePath, certBytes); err != nil {
   265  			return result, err
   266  		}
   267  
   268  		// write the key to disk
   269  		privateKeyPath := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "misty-crt.key")
   270  		encodedPrivateKey, err := keyutil.MarshalPrivateKeyToPEM(signer)
   271  		if err != nil {
   272  			return result, err
   273  		}
   274  		if err := keyutil.WriteKey(privateKeyPath, encodedPrivateKey); err != nil {
   275  			return result, err
   276  		}
   277  
   278  		s.ProxyClientKeyFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, "misty-crt.key")
   279  		s.ProxyClientCertFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, "misty-crt.crt")
   280  
   281  		clientSigningKey, err := testutil.NewPrivateKey()
   282  		if err != nil {
   283  			return result, err
   284  		}
   285  		clientSigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "client-ca"}, clientSigningKey)
   286  		if err != nil {
   287  			return result, err
   288  		}
   289  		clientCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "client-ca.crt")
   290  		if err := os.WriteFile(clientCACertFile, testutil.EncodeCertPEM(clientSigningCert), 0644); err != nil {
   291  			return result, err
   292  		}
   293  		s.Authentication.ClientCert.ClientCA = clientCACertFile
   294  		if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
   295  			// TODO: set up a general clean up for testserver
   296  			if clientgotransport.DialerStopCh == wait.NeverStop {
   297  				ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
   298  				t.Cleanup(cancel)
   299  				clientgotransport.DialerStopCh = ctx.Done()
   300  			}
   301  			s.PeerCAFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, s.SecureServing.ServerCert.PairName+".crt")
   302  		}
   303  	}
   304  
   305  	s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device
   306  
   307  	pkgPath, err := pkgPath(t)
   308  	if err != nil {
   309  		return result, err
   310  	}
   311  	s.SecureServing.ServerCert.FixtureDirectory = filepath.Join(pkgPath, "testdata")
   312  
   313  	s.ServiceClusterIPRanges = "10.0.0.0/16"
   314  	s.Etcd.StorageConfig = *storageConfig
   315  	s.APIEnablement.RuntimeConfig.Set("api/all=true")
   316  
   317  	if err := fs.Parse(customFlags); err != nil {
   318  		return result, err
   319  	}
   320  
   321  	saSigningKeyFile, err := os.CreateTemp("/tmp", "insecure_test_key")
   322  	if err != nil {
   323  		t.Fatalf("create temp file failed: %v", err)
   324  	}
   325  	defer os.RemoveAll(saSigningKeyFile.Name())
   326  	if err = os.WriteFile(saSigningKeyFile.Name(), []byte(ecdsaPrivateKey), 0666); err != nil {
   327  		t.Fatalf("write file %s failed: %v", saSigningKeyFile.Name(), err)
   328  	}
   329  	s.ServiceAccountSigningKeyFile = saSigningKeyFile.Name()
   330  	s.Authentication.ServiceAccounts.Issuers = []string{"https://foo.bar.example.com"}
   331  	s.Authentication.ServiceAccounts.KeyFiles = []string{saSigningKeyFile.Name()}
   332  
   333  	completedOptions, err := s.Complete()
   334  	if err != nil {
   335  		return result, fmt.Errorf("failed to set default ServerRunOptions: %v", err)
   336  	}
   337  
   338  	if errs := completedOptions.Validate(); len(errs) != 0 {
   339  		return result, fmt.Errorf("failed to validate ServerRunOptions: %v", utilerrors.NewAggregate(errs))
   340  	}
   341  
   342  	t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig)
   343  	t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
   344  
   345  	config, err := app.NewConfig(completedOptions)
   346  	if err != nil {
   347  		return result, err
   348  	}
   349  	completed, err := config.Complete()
   350  	if err != nil {
   351  		return result, err
   352  	}
   353  	server, err := app.CreateServerChain(completed)
   354  	if err != nil {
   355  		return result, fmt.Errorf("failed to create server chain: %v", err)
   356  	}
   357  	if instanceOptions.StorageVersionWrapFunc != nil {
   358  		server.GenericAPIServer.StorageVersionManager = instanceOptions.StorageVersionWrapFunc(server.GenericAPIServer.StorageVersionManager)
   359  	}
   360  
   361  	errCh = make(chan error)
   362  	go func(stopCh <-chan struct{}) {
   363  		defer close(errCh)
   364  		prepared, err := server.PrepareRun()
   365  		if err != nil {
   366  			errCh <- err
   367  		} else if err := prepared.Run(stopCh); err != nil {
   368  			errCh <- err
   369  		}
   370  	}(stopCh)
   371  
   372  	client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
   373  	if err != nil {
   374  		return result, fmt.Errorf("failed to create a client: %v", err)
   375  	}
   376  
   377  	if !instanceOptions.SkipHealthzCheck {
   378  		t.Logf("Waiting for /healthz to be ok...")
   379  
   380  		// wait until healthz endpoint returns ok
   381  		err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
   382  			select {
   383  			case err := <-errCh:
   384  				return false, err
   385  			default:
   386  			}
   387  
   388  			req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
   389  			// The storage version bootstrap test wraps the storage version post-start
   390  			// hook, so the hook won't become health when the server bootstraps
   391  			if instanceOptions.StorageVersionWrapFunc != nil {
   392  				// We hardcode the param instead of having a new instanceOptions field
   393  				// to avoid confusing users with more options.
   394  				storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
   395  				req.Param("exclude", storageVersionCheck)
   396  			}
   397  			result := req.Do(context.TODO())
   398  			status := 0
   399  			result.StatusCode(&status)
   400  			if status == 200 {
   401  				return true, nil
   402  			}
   403  			return false, nil
   404  		})
   405  		if err != nil {
   406  			return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
   407  		}
   408  	}
   409  
   410  	// wait until default namespace is created
   411  	err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
   412  		select {
   413  		case err := <-errCh:
   414  			return false, err
   415  		default:
   416  		}
   417  
   418  		if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil {
   419  			if !errors.IsNotFound(err) {
   420  				t.Logf("Unable to get default namespace: %v", err)
   421  			}
   422  			return false, nil
   423  		}
   424  		return true, nil
   425  	})
   426  	if err != nil {
   427  		return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
   428  	}
   429  
   430  	tlsInfo := transport.TLSInfo{
   431  		CertFile:      storageConfig.Transport.CertFile,
   432  		KeyFile:       storageConfig.Transport.KeyFile,
   433  		TrustedCAFile: storageConfig.Transport.TrustedCAFile,
   434  	}
   435  	tlsConfig, err := tlsInfo.ClientConfig()
   436  	if err != nil {
   437  		return result, err
   438  	}
   439  	etcdConfig := clientv3.Config{
   440  		Endpoints:   storageConfig.Transport.ServerList,
   441  		DialTimeout: 20 * time.Second,
   442  		DialOptions: []grpc.DialOption{
   443  			grpc.WithBlock(), // block until the underlying connection is up
   444  		},
   445  		TLS: tlsConfig,
   446  	}
   447  	etcdClient, err := clientv3.New(etcdConfig)
   448  	if err != nil {
   449  		return result, err
   450  	}
   451  
   452  	// from here the caller must call tearDown
   453  	result.ClientConfig = restclient.CopyConfig(server.GenericAPIServer.LoopbackClientConfig)
   454  	result.ClientConfig.QPS = 1000
   455  	result.ClientConfig.Burst = 10000
   456  	result.ServerOpts = s
   457  	result.TearDownFn = func() {
   458  		tearDown()
   459  		etcdClient.Close()
   460  	}
   461  	result.EtcdClient = etcdClient
   462  	result.EtcdStoragePrefix = storageConfig.Prefix
   463  
   464  	return result, nil
   465  }
   466  
   467  // StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
   468  func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
   469  	result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
   470  	if err == nil {
   471  		return &result
   472  	}
   473  
   474  	t.Fatalf("failed to launch server: %v", err)
   475  	return nil
   476  }
   477  
   478  func createLocalhostListenerOnFreePort() (net.Listener, int, error) {
   479  	ln, err := net.Listen("tcp", "127.0.0.1:0")
   480  	if err != nil {
   481  		return nil, 0, err
   482  	}
   483  
   484  	// get port
   485  	tcpAddr, ok := ln.Addr().(*net.TCPAddr)
   486  	if !ok {
   487  		ln.Close()
   488  		return nil, 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
   489  	}
   490  
   491  	return ln, tcpAddr.Port, nil
   492  }
   493  
   494  // pkgPath returns the absolute file path to this package's directory. With go
   495  // test, we can just look at the runtime call stack. However, bazel compiles go
   496  // binaries with the -trimpath option so the simple approach fails however we
   497  // can consult environment variables to derive the path.
   498  //
   499  // The approach taken here works for both go test and bazel on the assumption
   500  // that if and only if trimpath is passed, we are running under bazel.
   501  func pkgPath(t Logger) (string, error) {
   502  	_, thisFile, _, ok := runtime.Caller(0)
   503  	if !ok {
   504  		return "", fmt.Errorf("failed to get current file")
   505  	}
   506  
   507  	pkgPath := filepath.Dir(thisFile)
   508  
   509  	// If we find bazel env variables, then -trimpath was passed so we need to
   510  	// construct the path from the environment.
   511  	if testSrcdir, testWorkspace := os.Getenv("TEST_SRCDIR"), os.Getenv("TEST_WORKSPACE"); testSrcdir != "" && testWorkspace != "" {
   512  		t.Logf("Detected bazel env varaiables: TEST_SRCDIR=%q TEST_WORKSPACE=%q", testSrcdir, testWorkspace)
   513  		pkgPath = filepath.Join(testSrcdir, testWorkspace, pkgPath)
   514  	}
   515  
   516  	// If the path is still not absolute, something other than bazel compiled
   517  	// with -trimpath.
   518  	if !filepath.IsAbs(pkgPath) {
   519  		return "", fmt.Errorf("can't construct an absolute path from %q", pkgPath)
   520  	}
   521  
   522  	t.Logf("Resolved testserver package path to: %q", pkgPath)
   523  
   524  	return pkgPath, nil
   525  }
   526  

View as plain text