...

Source file src/k8s.io/kubernetes/test/integration/storageversionmigrator/util.go

Documentation: k8s.io/kubernetes/test/integration/storageversionmigrator

     1  /*
     2  Copyright 2024 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storageversionmigrator
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"crypto/x509"
    23  	"encoding/pem"
    24  	"fmt"
    25  	"net"
    26  	"net/http"
    27  	"os"
    28  	"path/filepath"
    29  	"regexp"
    30  	"strconv"
    31  	"strings"
    32  	"testing"
    33  	"time"
    34  
    35  	clientv3 "go.etcd.io/etcd/client/v3"
    36  
    37  	corev1 "k8s.io/api/core/v1"
    38  	svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
    39  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    40  	apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    41  	crdintegration "k8s.io/apiextensions-apiserver/test/integration"
    42  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    43  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    44  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme"
    45  	"k8s.io/apimachinery/pkg/runtime/schema"
    46  	"k8s.io/apimachinery/pkg/util/wait"
    47  	auditinternal "k8s.io/apiserver/pkg/apis/audit"
    48  	auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
    49  	"k8s.io/apiserver/pkg/storage/storagebackend"
    50  	"k8s.io/client-go/discovery"
    51  	cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
    52  	"k8s.io/client-go/dynamic"
    53  	"k8s.io/client-go/informers"
    54  	clientset "k8s.io/client-go/kubernetes"
    55  	"k8s.io/client-go/metadata"
    56  	"k8s.io/client-go/metadata/metadatainformer"
    57  	"k8s.io/client-go/rest"
    58  	"k8s.io/client-go/restmapper"
    59  	"k8s.io/client-go/util/cert"
    60  	"k8s.io/client-go/util/keyutil"
    61  	utiltesting "k8s.io/client-go/util/testing"
    62  	"k8s.io/controller-manager/pkg/informerfactory"
    63  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    64  	"k8s.io/kubernetes/cmd/kube-controller-manager/names"
    65  	"k8s.io/kubernetes/pkg/controller/garbagecollector"
    66  	"k8s.io/kubernetes/pkg/controller/storageversionmigrator"
    67  	"k8s.io/kubernetes/test/images/agnhost/crd-conversion-webhook/converter"
    68  	"k8s.io/kubernetes/test/integration"
    69  	"k8s.io/kubernetes/test/integration/etcd"
    70  	"k8s.io/kubernetes/test/integration/framework"
    71  	"k8s.io/kubernetes/test/utils"
    72  	utilnet "k8s.io/utils/net"
    73  	"k8s.io/utils/ptr"
    74  )
    75  
    76  const (
    77  	secretKey                = "api_key"
    78  	secretVal                = "086a7ffc-0225-11e8-ba89-0ed5f89f718b" // Fake value for testing.
    79  	secretName               = "test-secret"
    80  	triggerSecretName        = "trigger-for-svm"
    81  	svmName                  = "test-svm"
    82  	secondSVMName            = "second-test-svm"
    83  	auditPolicyFileName      = "audit-policy.yaml"
    84  	auditLogFileName         = "audit.log"
    85  	encryptionConfigFileName = "encryption.conf"
    86  	metricPrefix             = "apiserver_encryption_config_controller_automatic_reload_success_total"
    87  	defaultNamespace         = "default"
    88  	crdName                  = "testcrd"
    89  	crdGroup                 = "stable.example.com"
    90  	servicePort              = int32(9443)
    91  	webhookHandler           = "crdconvert"
    92  )
    93  
    94  var (
    95  	resources = map[string]string{
    96  		"auditPolicy": `
    97  apiVersion: audit.k8s.io/v1
    98  kind: Policy
    99  omitStages:
   100    - "RequestReceived"
   101  rules:
   102    - level: Metadata
   103      resources:
   104      - group: ""
   105        resources: ["secrets"]
   106      verbs: ["patch"]
   107  `,
   108  		"initialEncryptionConfig": `
   109  kind: EncryptionConfiguration
   110  apiVersion: apiserver.config.k8s.io/v1
   111  resources:
   112    - resources:
   113      - secrets
   114      providers:
   115      - aescbc:
   116          keys:
   117          - name: key1
   118            secret: c2VjcmV0IGlzIHNlY3VyZQ==
   119  `,
   120  		"updatedEncryptionConfig": `
   121  kind: EncryptionConfiguration
   122  apiVersion: apiserver.config.k8s.io/v1
   123  resources:
   124    - resources:
   125      - secrets
   126      providers:
   127      - aescbc:
   128          keys:
   129          - name: key2
   130            secret: c2VjcmV0IGlzIHNlY3VyZSwgaXMgaXQ/
   131      - aescbc:
   132          keys:
   133          - name: key1
   134            secret: c2VjcmV0IGlzIHNlY3VyZQ==
   135  `,
   136  	}
   137  
   138  	v1CRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{
   139  		{
   140  			Name:    "v1",
   141  			Served:  true,
   142  			Storage: true,
   143  			Schema: &apiextensionsv1.CustomResourceValidation{
   144  				OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   145  					Type: "object",
   146  					Properties: map[string]apiextensionsv1.JSONSchemaProps{
   147  						"hostPort": {Type: "string"},
   148  					},
   149  				},
   150  			},
   151  		},
   152  	}
   153  	v2CRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{
   154  		{
   155  			Name:    "v2",
   156  			Served:  true,
   157  			Storage: false,
   158  			Schema: &apiextensionsv1.CustomResourceValidation{
   159  				OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   160  					Type: "object",
   161  					Properties: map[string]apiextensionsv1.JSONSchemaProps{
   162  						"host": {Type: "string"},
   163  						"port": {Type: "string"},
   164  					},
   165  				},
   166  			},
   167  		},
   168  		{
   169  			Name:    "v1",
   170  			Served:  true,
   171  			Storage: true,
   172  			Schema: &apiextensionsv1.CustomResourceValidation{
   173  				OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   174  					Type: "object",
   175  					Properties: map[string]apiextensionsv1.JSONSchemaProps{
   176  						"hostPort": {Type: "string"},
   177  					},
   178  				},
   179  			},
   180  		},
   181  	}
   182  	v2StorageCRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{
   183  		{
   184  			Name:    "v1",
   185  			Served:  true,
   186  			Storage: false,
   187  			Schema: &apiextensionsv1.CustomResourceValidation{
   188  				OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   189  					Type: "object",
   190  					Properties: map[string]apiextensionsv1.JSONSchemaProps{
   191  						"hostPort": {Type: "string"},
   192  					},
   193  				},
   194  			},
   195  		},
   196  		{
   197  			Name:    "v2",
   198  			Served:  true,
   199  			Storage: true,
   200  			Schema: &apiextensionsv1.CustomResourceValidation{
   201  				OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   202  					Type: "object",
   203  					Properties: map[string]apiextensionsv1.JSONSchemaProps{
   204  						"host": {Type: "string"},
   205  						"port": {Type: "string"},
   206  					},
   207  				},
   208  			},
   209  		},
   210  	}
   211  	v1NotServingCRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{
   212  		{
   213  			Name:    "v1",
   214  			Served:  false,
   215  			Storage: false,
   216  			Schema: &apiextensionsv1.CustomResourceValidation{
   217  				OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   218  					Type: "object",
   219  					Properties: map[string]apiextensionsv1.JSONSchemaProps{
   220  						"hostPort": {Type: "string"},
   221  					},
   222  				},
   223  			},
   224  		},
   225  		{
   226  			Name:    "v2",
   227  			Served:  true,
   228  			Storage: true,
   229  			Schema: &apiextensionsv1.CustomResourceValidation{
   230  				OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   231  					Type: "object",
   232  					Properties: map[string]apiextensionsv1.JSONSchemaProps{
   233  						"host": {Type: "string"},
   234  						"port": {Type: "string"},
   235  					},
   236  				},
   237  			},
   238  		},
   239  	}
   240  )
   241  
   242  type svmTest struct {
   243  	policyFile                  *os.File
   244  	logFile                     *os.File
   245  	client                      clientset.Interface
   246  	clientConfig                *rest.Config
   247  	dynamicClient               *dynamic.DynamicClient
   248  	storageConfig               *storagebackend.Config
   249  	server                      *kubeapiservertesting.TestServer
   250  	apiextensionsclient         *apiextensionsclientset.Clientset
   251  	filePathForEncryptionConfig string
   252  }
   253  
   254  func svmSetup(ctx context.Context, t *testing.T) *svmTest {
   255  	t.Helper()
   256  
   257  	filePathForEncryptionConfig, err := createEncryptionConfig(t, resources["initialEncryptionConfig"])
   258  	if err != nil {
   259  		t.Fatalf("failed to create encryption config: %v", err)
   260  	}
   261  
   262  	policyFile, logFile := setupAudit(t)
   263  	apiServerFlags := []string{
   264  		"--encryption-provider-config", filepath.Join(filePathForEncryptionConfig, encryptionConfigFileName),
   265  		"--encryption-provider-config-automatic-reload=true",
   266  		"--disable-admission-plugins", "ServiceAccount",
   267  		"--audit-policy-file", policyFile.Name(),
   268  		"--audit-log-version", "audit.k8s.io/v1",
   269  		"--audit-log-mode", "blocking",
   270  		"--audit-log-path", logFile.Name(),
   271  	}
   272  	storageConfig := framework.SharedEtcd()
   273  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, apiServerFlags, storageConfig)
   274  
   275  	clientSet, err := clientset.NewForConfig(server.ClientConfig)
   276  	if err != nil {
   277  		t.Fatalf("error in create clientset: %v", err)
   278  	}
   279  
   280  	discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
   281  	rvDiscoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig)
   282  	if err != nil {
   283  		t.Fatalf("failed to create discovery client: %v", err)
   284  	}
   285  	restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
   286  	restMapper.Reset()
   287  	metadataClient, err := metadata.NewForConfig(server.ClientConfig)
   288  	if err != nil {
   289  		t.Fatalf("failed to create metadataClient: %v", err)
   290  	}
   291  	dynamicClient, err := dynamic.NewForConfig(server.ClientConfig)
   292  	if err != nil {
   293  		t.Fatalf("error in create dynamic client: %v", err)
   294  	}
   295  	sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
   296  	metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
   297  	alwaysStarted := make(chan struct{})
   298  	close(alwaysStarted)
   299  
   300  	gc, err := garbagecollector.NewGarbageCollector(
   301  		ctx,
   302  		clientSet,
   303  		metadataClient,
   304  		restMapper,
   305  		garbagecollector.DefaultIgnoredResources(),
   306  		informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
   307  		alwaysStarted,
   308  	)
   309  	if err != nil {
   310  		t.Fatalf("error while creating garbage collector: %v", err)
   311  
   312  	}
   313  	startGC := func() {
   314  		syncPeriod := 5 * time.Second
   315  		go wait.Until(func() {
   316  			restMapper.Reset()
   317  		}, syncPeriod, ctx.Done())
   318  		go gc.Run(ctx, 1)
   319  		go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
   320  	}
   321  
   322  	svmController := storageversionmigrator.NewSVMController(
   323  		ctx,
   324  		clientSet,
   325  		dynamicClient,
   326  		sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(),
   327  		names.StorageVersionMigratorController,
   328  		restMapper,
   329  		gc.GetDependencyGraphBuilder(),
   330  	)
   331  
   332  	rvController := storageversionmigrator.NewResourceVersionController(
   333  		ctx,
   334  		clientSet,
   335  		rvDiscoveryClient,
   336  		metadataClient,
   337  		sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(),
   338  		restMapper,
   339  	)
   340  
   341  	// Start informer and controllers
   342  	sharedInformers.Start(ctx.Done())
   343  	startGC()
   344  	go svmController.Run(ctx)
   345  	go rvController.Run(ctx)
   346  
   347  	svmTest := &svmTest{
   348  		storageConfig:               storageConfig,
   349  		server:                      server,
   350  		client:                      clientSet,
   351  		clientConfig:                server.ClientConfig,
   352  		dynamicClient:               dynamicClient,
   353  		policyFile:                  policyFile,
   354  		logFile:                     logFile,
   355  		filePathForEncryptionConfig: filePathForEncryptionConfig,
   356  	}
   357  
   358  	t.Cleanup(func() {
   359  		server.TearDownFn()
   360  		utiltesting.CloseAndRemove(t, svmTest.logFile)
   361  		utiltesting.CloseAndRemove(t, svmTest.policyFile)
   362  		err = os.RemoveAll(svmTest.filePathForEncryptionConfig)
   363  		if err != nil {
   364  			t.Errorf("error while removing temp directory: %v", err)
   365  		}
   366  	})
   367  
   368  	return svmTest
   369  }
   370  
   371  func createEncryptionConfig(t *testing.T, encryptionConfig string) (
   372  	filePathForEncryptionConfig string,
   373  	err error,
   374  ) {
   375  	t.Helper()
   376  	tempDir, err := os.MkdirTemp("", svmName)
   377  	if err != nil {
   378  		return "", fmt.Errorf("failed to create temp directory: %w", err)
   379  	}
   380  
   381  	if err = os.WriteFile(filepath.Join(tempDir, encryptionConfigFileName), []byte(encryptionConfig), 0644); err != nil {
   382  		err = os.RemoveAll(tempDir)
   383  		if err != nil {
   384  			t.Errorf("error while removing temp directory: %v", err)
   385  		}
   386  		return tempDir, fmt.Errorf("error while writing encryption config: %w", err)
   387  	}
   388  
   389  	return tempDir, nil
   390  }
   391  
   392  func (svm *svmTest) createSecret(ctx context.Context, t *testing.T, name, namespace string) (*corev1.Secret, error) {
   393  	t.Helper()
   394  	secret := &corev1.Secret{
   395  		ObjectMeta: metav1.ObjectMeta{
   396  			Name:      name,
   397  			Namespace: namespace,
   398  		},
   399  		Data: map[string][]byte{
   400  			secretKey: []byte(secretVal),
   401  		},
   402  	}
   403  
   404  	return svm.client.CoreV1().Secrets(secret.Namespace).Create(ctx, secret, metav1.CreateOptions{})
   405  }
   406  
   407  func (svm *svmTest) getRawSecretFromETCD(t *testing.T, name, namespace string) ([]byte, error) {
   408  	t.Helper()
   409  	secretETCDPath := svm.getETCDPathForResource(t, svm.storageConfig.Prefix, "", "secrets", name, namespace)
   410  	etcdResponse, err := svm.readRawRecordFromETCD(t, secretETCDPath)
   411  	if err != nil {
   412  		return nil, fmt.Errorf("failed to read %s from etcd: %w", secretETCDPath, err)
   413  	}
   414  	return etcdResponse.Kvs[0].Value, nil
   415  }
   416  
   417  func (svm *svmTest) getETCDPathForResource(t *testing.T, storagePrefix, group, resource, name, namespaceName string) string {
   418  	t.Helper()
   419  	groupResource := resource
   420  	if group != "" {
   421  		groupResource = fmt.Sprintf("%s/%s", group, resource)
   422  	}
   423  	if namespaceName == "" {
   424  		return fmt.Sprintf("/%s/%s/%s", storagePrefix, groupResource, name)
   425  	}
   426  	return fmt.Sprintf("/%s/%s/%s/%s", storagePrefix, groupResource, namespaceName, name)
   427  }
   428  
   429  func (svm *svmTest) readRawRecordFromETCD(t *testing.T, path string) (*clientv3.GetResponse, error) {
   430  	t.Helper()
   431  	rawClient, etcdClient, err := integration.GetEtcdClients(svm.server.ServerOpts.Etcd.StorageConfig.Transport)
   432  	if err != nil {
   433  		return nil, fmt.Errorf("failed to create etcd client: %w", err)
   434  	}
   435  	// kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to
   436  	// close the client (which we can do by closing rawClient).
   437  	defer func() {
   438  		if err := rawClient.Close(); err != nil {
   439  			t.Errorf("error closing rawClient: %v", err)
   440  		}
   441  	}()
   442  
   443  	response, err := etcdClient.Get(context.Background(), path, clientv3.WithPrefix())
   444  	if err != nil {
   445  		return nil, fmt.Errorf("failed to retrieve secret from etcd %w", err)
   446  	}
   447  
   448  	return response, nil
   449  }
   450  
   451  func (svm *svmTest) getRawCRFromETCD(t *testing.T, name, namespace, crdGroup, crdName string) ([]byte, error) {
   452  	t.Helper()
   453  	crdETCDPath := svm.getETCDPathForResource(t, svm.storageConfig.Prefix, crdGroup, crdName, name, namespace)
   454  	etcdResponse, err := svm.readRawRecordFromETCD(t, crdETCDPath)
   455  	if err != nil {
   456  		t.Fatalf("failed to read %s from etcd: %v", crdETCDPath, err)
   457  	}
   458  	return etcdResponse.Kvs[0].Value, nil
   459  }
   460  
   461  func (svm *svmTest) updateFile(t *testing.T, configDir, filename string, newContent []byte) {
   462  	t.Helper()
   463  	// Create a temporary file
   464  	tempFile, err := os.CreateTemp(configDir, "tempfile")
   465  	if err != nil {
   466  		t.Fatal(err)
   467  	}
   468  	defer func() {
   469  		if err := tempFile.Close(); err != nil {
   470  			t.Errorf("error closing tempFile: %v", err)
   471  		}
   472  	}()
   473  
   474  	// Write the new content to the temporary file
   475  	_, err = tempFile.Write(newContent)
   476  	if err != nil {
   477  		t.Fatal(err)
   478  	}
   479  
   480  	// Atomically replace the original file with the temporary file
   481  	err = os.Rename(tempFile.Name(), filepath.Join(configDir, filename))
   482  	if err != nil {
   483  		t.Fatal(err)
   484  	}
   485  }
   486  
   487  // func (svm *svmTest) createSVMResource(ctx context.Context, t *testing.T, name string) (
   488  // 	*svmv1alpha1.StorageVersionMigration,
   489  // 	error,
   490  // ) {
   491  // 	t.Helper()
   492  // 	svmResource := &svmv1alpha1.StorageVersionMigration{
   493  // 		ObjectMeta: metav1.ObjectMeta{
   494  // 			Name: name,
   495  // 		},
   496  // 		Spec: svmv1alpha1.StorageVersionMigrationSpec{
   497  // 			Resource: svmv1alpha1.GroupVersionResource{
   498  // 				Group:    "",
   499  // 				Version:  "v1",
   500  // 				Resource: "secrets",
   501  // 			},
   502  // 		},
   503  // 	}
   504  //
   505  // 	return svm.client.StoragemigrationV1alpha1().
   506  // 		StorageVersionMigrations().
   507  // 		Create(ctx, svmResource, metav1.CreateOptions{})
   508  // }
   509  
   510  func (svm *svmTest) createSVMResource(ctx context.Context, t *testing.T, name string, gvr svmv1alpha1.GroupVersionResource) (
   511  	*svmv1alpha1.StorageVersionMigration,
   512  	error,
   513  ) {
   514  	t.Helper()
   515  	svmResource := &svmv1alpha1.StorageVersionMigration{
   516  		ObjectMeta: metav1.ObjectMeta{
   517  			Name: name,
   518  		},
   519  		Spec: svmv1alpha1.StorageVersionMigrationSpec{
   520  			Resource: svmv1alpha1.GroupVersionResource{
   521  				Group:    gvr.Group,
   522  				Version:  gvr.Version,
   523  				Resource: gvr.Resource,
   524  			},
   525  		},
   526  	}
   527  
   528  	return svm.client.StoragemigrationV1alpha1().
   529  		StorageVersionMigrations().
   530  		Create(ctx, svmResource, metav1.CreateOptions{})
   531  }
   532  
   533  func (svm *svmTest) getSVM(ctx context.Context, t *testing.T, name string) (
   534  	*svmv1alpha1.StorageVersionMigration,
   535  	error,
   536  ) {
   537  	t.Helper()
   538  	return svm.client.StoragemigrationV1alpha1().
   539  		StorageVersionMigrations().
   540  		Get(ctx, name, metav1.GetOptions{})
   541  }
   542  
   543  func setupAudit(t *testing.T) (
   544  	policyFile *os.File,
   545  	logFile *os.File,
   546  ) {
   547  	t.Helper()
   548  	// prepare audit policy file
   549  	policyFile, err := os.CreateTemp("", auditPolicyFileName)
   550  	if err != nil {
   551  		t.Fatalf("Failed to create audit policy file: %v", err)
   552  	}
   553  	if _, err := policyFile.Write([]byte(resources["auditPolicy"])); err != nil {
   554  		t.Fatalf("Failed to write audit policy file: %v", err)
   555  	}
   556  
   557  	// prepare audit log file
   558  	logFile, err = os.CreateTemp("", auditLogFileName)
   559  	if err != nil {
   560  		t.Fatalf("Failed to create audit log file: %v", err)
   561  	}
   562  
   563  	return policyFile, logFile
   564  }
   565  
   566  func (svm *svmTest) getAutomaticReloadSuccessTotal(ctx context.Context, t *testing.T) int {
   567  	t.Helper()
   568  
   569  	copyConfig := rest.CopyConfig(svm.server.ClientConfig)
   570  	copyConfig.GroupVersion = &schema.GroupVersion{}
   571  	copyConfig.NegotiatedSerializer = unstructuredscheme.NewUnstructuredNegotiatedSerializer()
   572  	rc, err := rest.RESTClientFor(copyConfig)
   573  	if err != nil {
   574  		t.Fatalf("Failed to create REST client: %v", err)
   575  	}
   576  
   577  	body, err := rc.Get().AbsPath("/metrics").DoRaw(ctx)
   578  	if err != nil {
   579  		t.Fatal(err)
   580  	}
   581  
   582  	metricRegex := regexp.MustCompile(fmt.Sprintf(`%s{.*} (\d+)`, metricPrefix))
   583  	for _, line := range strings.Split(string(body), "\n") {
   584  		if strings.HasPrefix(line, metricPrefix) {
   585  			matches := metricRegex.FindStringSubmatch(line)
   586  			if len(matches) == 2 {
   587  				metricValue, err := strconv.Atoi(matches[1])
   588  				if err != nil {
   589  					t.Fatalf("Failed to convert metric value to integer: %v", err)
   590  				}
   591  				return metricValue
   592  			}
   593  		}
   594  	}
   595  
   596  	return 0
   597  }
   598  
   599  func (svm *svmTest) isEncryptionConfigFileUpdated(ctx context.Context, t *testing.T, metricBeforeUpdate int) bool {
   600  	t.Helper()
   601  
   602  	err := wait.PollUntilContextTimeout(
   603  		ctx,
   604  		500*time.Millisecond,
   605  		wait.ForeverTestTimeout,
   606  		true,
   607  		func(ctx context.Context) (bool, error) {
   608  			metric := svm.getAutomaticReloadSuccessTotal(ctx, t)
   609  			return metric == (metricBeforeUpdate + 1), nil
   610  		},
   611  	)
   612  
   613  	return err == nil
   614  }
   615  
   616  // waitForResourceMigration checks following conditions:
   617  // 1. The svm resource has SuccessfullyMigrated condition.
   618  // 2. The audit log contains patch events for the given secret.
   619  func (svm *svmTest) waitForResourceMigration(
   620  	ctx context.Context,
   621  	t *testing.T,
   622  	svmName, name string,
   623  	expectedEvents int,
   624  ) bool {
   625  	t.Helper()
   626  
   627  	var isMigrated bool
   628  	err := wait.PollUntilContextTimeout(
   629  		ctx,
   630  		500*time.Millisecond,
   631  		wait.ForeverTestTimeout,
   632  		true,
   633  		func(ctx context.Context) (bool, error) {
   634  			svmResource, err := svm.getSVM(ctx, t, svmName)
   635  			if err != nil {
   636  				t.Fatalf("Failed to get SVM resource: %v", err)
   637  			}
   638  			if svmResource.Status.ResourceVersion == "" {
   639  				return false, nil
   640  			}
   641  
   642  			if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) {
   643  				isMigrated = true
   644  			}
   645  
   646  			// We utilize the LastSyncResourceVersion of the Garbage Collector (GC) to ensure that the cache is up-to-date before proceeding with the migration.
   647  			// However, in a quiet cluster, the GC may not be updated unless there is some activity or the watch receives a bookmark event after every 10 minutes.
   648  			// To expedite the update of the GC cache, we create a dummy secret and then promptly delete it.
   649  			// This action forces the GC to refresh its cache, enabling us to proceed with the migration.
   650  			_, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace)
   651  			if err != nil {
   652  				t.Fatalf("Failed to create secret: %v", err)
   653  			}
   654  			err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{})
   655  			if err != nil {
   656  				t.Fatalf("Failed to delete secret: %v", err)
   657  			}
   658  
   659  			stream, err := os.Open(svm.logFile.Name())
   660  			if err != nil {
   661  				t.Fatalf("Failed to open audit log file: %v", err)
   662  			}
   663  			defer func() {
   664  				if err := stream.Close(); err != nil {
   665  					t.Errorf("error	while closing audit log file: %v", err)
   666  				}
   667  			}()
   668  
   669  			missingReport, err := utils.CheckAuditLines(
   670  				stream,
   671  				[]utils.AuditEvent{
   672  					{
   673  						Level:             auditinternal.LevelMetadata,
   674  						Stage:             auditinternal.StageResponseComplete,
   675  						RequestURI:        fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name),
   676  						Verb:              "patch",
   677  						Code:              200,
   678  						User:              "system:apiserver",
   679  						Resource:          "secrets",
   680  						Namespace:         "default",
   681  						AuthorizeDecision: "allow",
   682  						RequestObject:     false,
   683  						ResponseObject:    false,
   684  					},
   685  				},
   686  				auditv1.SchemeGroupVersion,
   687  			)
   688  			if err != nil {
   689  				t.Fatalf("Failed to check audit log: %v", err)
   690  			}
   691  			if (len(missingReport.MissingEvents) != 0) && (expectedEvents < missingReport.NumEventsChecked) {
   692  				isMigrated = false
   693  			}
   694  
   695  			return isMigrated, nil
   696  		},
   697  	)
   698  	if err != nil {
   699  		return false
   700  	}
   701  
   702  	return isMigrated
   703  }
   704  
   705  func (svm *svmTest) createCRD(
   706  	t *testing.T,
   707  	name, group string,
   708  	certCtx *certContext,
   709  	crdVersions []apiextensionsv1.CustomResourceDefinitionVersion,
   710  ) *apiextensionsv1.CustomResourceDefinition {
   711  	t.Helper()
   712  	pluralName := name + "s"
   713  	listKind := name + "List"
   714  
   715  	crd := &apiextensionsv1.CustomResourceDefinition{
   716  		ObjectMeta: metav1.ObjectMeta{
   717  			Name: pluralName + "." + group,
   718  		},
   719  		Spec: apiextensionsv1.CustomResourceDefinitionSpec{
   720  			Group: group,
   721  			Names: apiextensionsv1.CustomResourceDefinitionNames{
   722  				Kind:     name,
   723  				ListKind: listKind,
   724  				Plural:   pluralName,
   725  				Singular: name,
   726  			},
   727  			Scope:    apiextensionsv1.NamespaceScoped,
   728  			Versions: crdVersions,
   729  			Conversion: &apiextensionsv1.CustomResourceConversion{
   730  				Strategy: apiextensionsv1.WebhookConverter,
   731  				Webhook: &apiextensionsv1.WebhookConversion{
   732  					ClientConfig: &apiextensionsv1.WebhookClientConfig{
   733  						CABundle: certCtx.signingCert,
   734  						URL: ptr.To(
   735  							fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler),
   736  						),
   737  					},
   738  					ConversionReviewVersions: []string{"v1", "v2"},
   739  				},
   740  			},
   741  			PreserveUnknownFields: false,
   742  		},
   743  	}
   744  
   745  	apiextensionsclient, err := apiextensionsclientset.NewForConfig(svm.clientConfig)
   746  	if err != nil {
   747  		t.Fatalf("Failed to create apiextensions client: %v", err)
   748  	}
   749  	svm.apiextensionsclient = apiextensionsclient
   750  
   751  	etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
   752  	return crd
   753  }
   754  
   755  func (svm *svmTest) updateCRD(
   756  	ctx context.Context,
   757  	t *testing.T,
   758  	crdName string,
   759  	updatesCRDVersions []apiextensionsv1.CustomResourceDefinitionVersion,
   760  ) *apiextensionsv1.CustomResourceDefinition {
   761  	t.Helper()
   762  
   763  	var err error
   764  	_, err = crdintegration.UpdateV1CustomResourceDefinitionWithRetry(svm.apiextensionsclient, crdName, func(c *apiextensionsv1.CustomResourceDefinition) {
   765  		c.Spec.Versions = updatesCRDVersions
   766  	})
   767  	if err != nil {
   768  		t.Fatalf("Failed to update CRD: %v", err)
   769  	}
   770  
   771  	crd, err := svm.apiextensionsclient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{})
   772  	if err != nil {
   773  		t.Fatalf("Failed to get CRD: %v", err)
   774  	}
   775  
   776  	// TODO: wrap all actions after updateCRD with wait loops so we do not need this sleep
   777  	//  it is currently necessary because we update the CRD but do not otherwise guarantee that the updated config is active
   778  	time.Sleep(10 * time.Second)
   779  
   780  	return crd
   781  }
   782  
   783  func (svm *svmTest) createCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured {
   784  	t.Helper()
   785  
   786  	crdResource := schema.GroupVersionResource{
   787  		Group:    crdGroup,
   788  		Version:  version,
   789  		Resource: crdName + "s",
   790  	}
   791  
   792  	crdUnstructured := &unstructured.Unstructured{
   793  		Object: map[string]interface{}{
   794  			"apiVersion": crdResource.GroupVersion().String(),
   795  			"kind":       crdName,
   796  			"metadata": map[string]interface{}{
   797  				"name":      crName,
   798  				"namespace": defaultNamespace,
   799  			},
   800  		},
   801  	}
   802  
   803  	crdUnstructured, err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).Create(ctx, crdUnstructured, metav1.CreateOptions{})
   804  	if err != nil {
   805  		t.Fatalf("Failed to create CR: %v", err)
   806  	}
   807  
   808  	return crdUnstructured
   809  }
   810  
   811  func (svm *svmTest) getCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured {
   812  	t.Helper()
   813  
   814  	crdResource := schema.GroupVersionResource{
   815  		Group:    crdGroup,
   816  		Version:  version,
   817  		Resource: crdName + "s",
   818  	}
   819  
   820  	cr, err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).Get(ctx, crName, metav1.GetOptions{})
   821  	if err != nil {
   822  		t.Fatalf("Failed to get CR: %v", err)
   823  	}
   824  
   825  	return cr
   826  }
   827  
   828  func (svm *svmTest) listCR(ctx context.Context, t *testing.T, version string) error {
   829  	t.Helper()
   830  
   831  	crdResource := schema.GroupVersionResource{
   832  		Group:    crdGroup,
   833  		Version:  version,
   834  		Resource: crdName + "s",
   835  	}
   836  
   837  	_, err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).List(ctx, metav1.ListOptions{})
   838  
   839  	return err
   840  }
   841  
   842  func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version string) {
   843  	t.Helper()
   844  	crdResource := schema.GroupVersionResource{
   845  		Group:    crdGroup,
   846  		Version:  version,
   847  		Resource: crdName + "s",
   848  	}
   849  	err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).Delete(ctx, name, metav1.DeleteOptions{})
   850  	if err != nil {
   851  		t.Fatalf("Failed to delete CR: %v", err)
   852  	}
   853  }
   854  
   855  func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, certCtx *certContext) context.CancelFunc {
   856  	t.Helper()
   857  	http.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert)
   858  
   859  	block, _ := pem.Decode(certCtx.key)
   860  	if block == nil {
   861  		panic("failed to parse PEM block containing the key")
   862  	}
   863  	key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
   864  	if err != nil {
   865  		t.Fatalf("Failed to parse private key: %v", err)
   866  	}
   867  
   868  	blockCer, _ := pem.Decode(certCtx.cert)
   869  	if blockCer == nil {
   870  		panic("failed to parse PEM block containing the key")
   871  	}
   872  	webhookCert, err := x509.ParseCertificate(blockCer.Bytes)
   873  	if err != nil {
   874  		t.Fatalf("Failed to parse certificate: %v", err)
   875  	}
   876  
   877  	server := &http.Server{
   878  		Addr: fmt.Sprintf("127.0.0.1:%d", servicePort),
   879  		TLSConfig: &tls.Config{
   880  			Certificates: []tls.Certificate{
   881  				{
   882  					Certificate: [][]byte{webhookCert.Raw},
   883  					PrivateKey:  key,
   884  				},
   885  			},
   886  		},
   887  	}
   888  
   889  	go func() {
   890  		// skipping error handling here because this always returns a non-nil error.
   891  		// after Server.Shutdown, the returned error is ErrServerClosed.
   892  		_ = server.ListenAndServeTLS("", "")
   893  
   894  	}()
   895  
   896  	serverCtx, cancel := context.WithCancel(ctx)
   897  	go func(ctx context.Context, t *testing.T) {
   898  		<-ctx.Done()
   899  		// Context was cancelled, shutdown the server
   900  		if err := server.Shutdown(context.Background()); err != nil {
   901  			t.Logf("Failed to shutdown server: %v", err)
   902  		}
   903  	}(serverCtx, t)
   904  
   905  	return cancel
   906  }
   907  
   908  type certContext struct {
   909  	cert        []byte
   910  	key         []byte
   911  	signingCert []byte
   912  }
   913  
   914  func (svm *svmTest) setupServerCert(t *testing.T) *certContext {
   915  	t.Helper()
   916  	certDir, err := os.MkdirTemp("", "test-e2e-server-cert")
   917  	if err != nil {
   918  		t.Fatalf("Failed to create a temp dir for cert generation %v", err)
   919  	}
   920  	defer func(path string) {
   921  		err := os.RemoveAll(path)
   922  		if err != nil {
   923  			t.Fatalf("Failed to remove temp dir %v", err)
   924  		}
   925  	}(certDir)
   926  	signingKey, err := utils.NewPrivateKey()
   927  	if err != nil {
   928  		t.Fatalf("Failed to create CA private key %v", err)
   929  	}
   930  	signingCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "e2e-server-cert-ca"}, signingKey)
   931  	if err != nil {
   932  		t.Fatalf("Failed to create CA cert for apiserver %v", err)
   933  	}
   934  	caCertFile, err := os.CreateTemp(certDir, "ca.crt")
   935  	if err != nil {
   936  		t.Fatalf("Failed to create a temp file for ca cert generation %v", err)
   937  	}
   938  	defer utiltesting.CloseAndRemove(&testing.T{}, caCertFile)
   939  	if err := os.WriteFile(caCertFile.Name(), utils.EncodeCertPEM(signingCert), 0644); err != nil {
   940  		t.Fatalf("Failed to write CA cert %v", err)
   941  	}
   942  	key, err := utils.NewPrivateKey()
   943  	if err != nil {
   944  		t.Fatalf("Failed to create private key for %v", err)
   945  	}
   946  	signedCert, err := utils.NewSignedCert(
   947  		&cert.Config{
   948  			CommonName: "127.0.0.1",
   949  			AltNames: cert.AltNames{
   950  				IPs: []net.IP{utilnet.ParseIPSloppy("127.0.0.1")},
   951  			},
   952  			Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
   953  		},
   954  		key, signingCert, signingKey,
   955  	)
   956  	if err != nil {
   957  		t.Fatalf("Failed to create cert%v", err)
   958  	}
   959  	certFile, err := os.CreateTemp(certDir, "server.crt")
   960  	if err != nil {
   961  		t.Fatalf("Failed to create a temp file for cert generation %v", err)
   962  	}
   963  	defer utiltesting.CloseAndRemove(&testing.T{}, certFile)
   964  	keyFile, err := os.CreateTemp(certDir, "server.key")
   965  	if err != nil {
   966  		t.Fatalf("Failed to create a temp file for key generation %v", err)
   967  	}
   968  	if err = os.WriteFile(certFile.Name(), utils.EncodeCertPEM(signedCert), 0600); err != nil {
   969  		t.Fatalf("Failed to write cert file %v", err)
   970  	}
   971  	privateKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(key)
   972  	if err != nil {
   973  		t.Fatalf("Failed to marshal key %v", err)
   974  	}
   975  	if err = os.WriteFile(keyFile.Name(), privateKeyPEM, 0644); err != nil {
   976  		t.Fatalf("Failed to write key file %v", err)
   977  	}
   978  	defer utiltesting.CloseAndRemove(&testing.T{}, keyFile)
   979  	return &certContext{
   980  		cert:        utils.EncodeCertPEM(signedCert),
   981  		key:         privateKeyPEM,
   982  		signingCert: utils.EncodeCertPEM(signingCert),
   983  	}
   984  }
   985  
   986  func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bool {
   987  	t.Helper()
   988  
   989  	data, err := svm.getRawCRFromETCD(t, crName, defaultNamespace, crdGroup, crdName+"s")
   990  	if err != nil {
   991  		t.Fatalf("Failed to get CR from etcd: %v", err)
   992  	}
   993  
   994  	// parse data to unstructured.Unstructured
   995  	obj := &unstructured.Unstructured{}
   996  	err = obj.UnmarshalJSON(data)
   997  	if err != nil {
   998  		t.Fatalf("Failed to unmarshal data to unstructured: %v", err)
   999  	}
  1000  
  1001  	return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version)
  1002  }
  1003  
  1004  func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName string) bool {
  1005  	t.Helper()
  1006  
  1007  	err := wait.PollUntilContextTimeout(
  1008  		ctx,
  1009  		500*time.Millisecond,
  1010  		1*time.Minute,
  1011  		true,
  1012  		func(ctx context.Context) (bool, error) {
  1013  			triggerCR := svm.createCR(ctx, t, "triggercr", "v1")
  1014  			svm.deleteCR(ctx, t, triggerCR.GetName(), "v1")
  1015  			svmResource, err := svm.getSVM(ctx, t, crdSVMName)
  1016  			if err != nil {
  1017  				t.Fatalf("Failed to get SVM resource: %v", err)
  1018  			}
  1019  			if svmResource.Status.ResourceVersion == "" {
  1020  				return false, nil
  1021  			}
  1022  
  1023  			if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) {
  1024  				return true, nil
  1025  			}
  1026  
  1027  			return false, nil
  1028  		},
  1029  	)
  1030  	return err == nil
  1031  }
  1032  
  1033  type versions struct {
  1034  	generation  int64
  1035  	rv          string
  1036  	isRVUpdated bool
  1037  }
  1038  
  1039  func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions) {
  1040  	t.Helper()
  1041  
  1042  	for crName, version := range crVersions {
  1043  		// get CR from etcd
  1044  		data, err := svm.getRawCRFromETCD(t, crName, defaultNamespace, crdGroup, crdName+"s")
  1045  		if err != nil {
  1046  			t.Fatalf("Failed to get CR from etcd: %v", err)
  1047  		}
  1048  
  1049  		// parse data to unstructured.Unstructured
  1050  		obj := &unstructured.Unstructured{}
  1051  		err = obj.UnmarshalJSON(data)
  1052  		if err != nil {
  1053  			t.Fatalf("Failed to unmarshal data to unstructured: %v", err)
  1054  		}
  1055  
  1056  		// validate resourceVersion and generation
  1057  		crVersion := svm.getCR(ctx, t, crName, "v2").GetResourceVersion()
  1058  		if version.isRVUpdated && crVersion == version.rv {
  1059  			t.Fatalf("ResourceVersion of CR %s should not be equal. Expected: %s, Got: %s", crName, version.rv, crVersion)
  1060  		}
  1061  		if obj.GetGeneration() != version.generation {
  1062  			t.Fatalf("Generation of CR %s should be equal. Expected: %d, Got: %d", crName, version.generation, obj.GetGeneration())
  1063  		}
  1064  	}
  1065  }
  1066  

View as plain text